// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "math" "math/rand" "os" "path/filepath" "reflect" "sort" "strings" "sync/atomic" "time" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nuid" ) // jetStreamCluster holds information about the meta group and stream assignments. type jetStreamCluster struct { // The metacontroller raftNode. meta RaftNode // For stream and consumer assignments. All servers will have this be the same. // ACCOUNT -> STREAM -> Stream Assignment -> Consumers streams map[string]map[string]*streamAssignment // These are inflight proposals and used to apply limits when there are // concurrent requests that would otherwise be accepted. // We also record the group for the stream. This is needed since if we have // concurrent requests for same account and stream we need to let it process to get // a response but they need to be same group, peers etc. inflight map[string]map[string]*raftGroup // Signals meta-leader should check the stream assignments. streamsCheck bool // Server. s *Server // Internal client. c *client // Processing assignment results. streamResults *subscription consumerResults *subscription // System level request to have the leader stepdown. stepdown *subscription // System level requests to remove a peer. peerRemove *subscription // System level request to move a stream peerStreamMove *subscription // System level request to cancel a stream move peerStreamCancelMove *subscription // To pop out the monitorCluster before the raft layer. qch chan struct{} } // Used to guide placement of streams and meta controllers in clustered JetStream. type Placement struct { Cluster string `json:"cluster,omitempty"` Tags []string `json:"tags,omitempty"` } // Define types of the entry. type entryOp uint8 // ONLY ADD TO THE END, DO NOT INSERT IN BETWEEN WILL BREAK SERVER INTEROP. const ( // Meta ops. assignStreamOp entryOp = iota assignConsumerOp removeStreamOp removeConsumerOp // Stream ops. streamMsgOp purgeStreamOp deleteMsgOp // Consumer ops. updateDeliveredOp updateAcksOp // Compressed consumer assignments. assignCompressedConsumerOp // Filtered Consumer skip. updateSkipOp // Update Stream. updateStreamOp // For updating information on pending pull requests. addPendingRequest removePendingRequest // For sending compressed streams, either through RAFT or catchup. compressedStreamMsgOp ) // raftGroups are controlled by the metagroup controller. // The raftGroups will house streams and consumers. type raftGroup struct { Name string `json:"name"` Peers []string `json:"peers"` Storage StorageType `json:"store"` Cluster string `json:"cluster,omitempty"` Preferred string `json:"preferred,omitempty"` // Internal node RaftNode } // streamAssignment is what the meta controller uses to assign streams to peers. type streamAssignment struct { Client *ClientInfo `json:"client,omitempty"` Created time.Time `json:"created"` Config *StreamConfig `json:"stream"` Group *raftGroup `json:"group"` Sync string `json:"sync"` Subject string `json:"subject"` Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal consumers map[string]*consumerAssignment responded bool recovering bool err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. type consumerAssignment struct { Client *ClientInfo `json:"client,omitempty"` Created time.Time `json:"created"` Name string `json:"name"` Stream string `json:"stream"` Config *ConsumerConfig `json:"consumer"` Group *raftGroup `json:"group"` Subject string `json:"subject"` Reply string `json:"reply"` State *ConsumerState `json:"state,omitempty"` // Internal responded bool recovering bool deleted bool err error } // streamPurge is what the stream leader will replicate when purging a stream. type streamPurge struct { Client *ClientInfo `json:"client,omitempty"` Stream string `json:"stream"` LastSeq uint64 `json:"last_seq"` Subject string `json:"subject"` Reply string `json:"reply"` Request *JSApiStreamPurgeRequest `json:"request,omitempty"` } // streamMsgDelete is what the stream leader will replicate when deleting a message. type streamMsgDelete struct { Client *ClientInfo `json:"client,omitempty"` Stream string `json:"stream"` Seq uint64 `json:"seq"` NoErase bool `json:"no_erase,omitempty"` Subject string `json:"subject"` Reply string `json:"reply"` } const ( defaultStoreDirName = "_js_" defaultMetaGroupName = "_meta_" defaultMetaFSBlkSize = 1024 * 1024 jsExcludePlacement = "!jetstream" ) // Returns information useful in mixed mode. func (s *Server) trackedJetStreamServers() (js, total int) { s.mu.RLock() defer s.mu.RUnlock() if !s.running || !s.eventsEnabled() { return -1, -1 } s.nodeToInfo.Range(func(k, v interface{}) bool { si := v.(nodeInfo) if si.js { js++ } total++ return true }) return js, total } func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { s.mu.RLock() shutdown, js := s.shutdown, s.js s.mu.RUnlock() if shutdown || js == nil { return nil, nil } // Only set once, do not need a lock. return js, js.cluster } func (s *Server) JetStreamIsClustered() bool { js := s.getJetStream() if js == nil { return false } return js.isClustered() } func (s *Server) JetStreamIsLeader() bool { js := s.getJetStream() if js == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return js.cluster.isLeader() } func (s *Server) JetStreamIsCurrent() bool { js := s.getJetStream() if js == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return js.cluster.isCurrent() } func (s *Server) JetStreamSnapshotMeta() error { js := s.getJetStream() if js == nil { return NewJSNotEnabledError() } js.mu.RLock() cc := js.cluster isLeader := cc.isLeader() meta := cc.meta js.mu.RUnlock() if !isLeader { return errNotLeader } return meta.InstallSnapshot(js.metaSnapshot()) } func (s *Server) JetStreamStepdownStream(account, stream string) error { js, cc := s.getJetStreamCluster() if js == nil { return NewJSNotEnabledError() } if cc == nil { return NewJSClusterNotActiveError() } // Grab account acc, err := s.LookupAccount(account) if err != nil { return err } // Grab stream mset, err := acc.lookupStream(stream) if err != nil { return err } if node := mset.raftNode(); node != nil && node.Leader() { node.StepDown() } return nil } func (s *Server) JetStreamStepdownConsumer(account, stream, consumer string) error { js, cc := s.getJetStreamCluster() if js == nil { return NewJSNotEnabledError() } if cc == nil { return NewJSClusterNotActiveError() } // Grab account acc, err := s.LookupAccount(account) if err != nil { return err } // Grab stream mset, err := acc.lookupStream(stream) if err != nil { return err } o := mset.lookupConsumer(consumer) if o == nil { return NewJSConsumerNotFoundError() } if node := o.raftNode(); node != nil && node.Leader() { node.StepDown() } return nil } func (s *Server) JetStreamSnapshotStream(account, stream string) error { js, cc := s.getJetStreamCluster() if js == nil { return NewJSNotEnabledForAccountError() } if cc == nil { return NewJSClusterNotActiveError() } // Grab account acc, err := s.LookupAccount(account) if err != nil { return err } // Grab stream mset, err := acc.lookupStream(stream) if err != nil { return err } mset.mu.RLock() if !mset.node.Leader() { mset.mu.RUnlock() return NewJSNotEnabledForAccountError() } n := mset.node mset.mu.RUnlock() return n.InstallSnapshot(mset.stateSnapshot()) } func (s *Server) JetStreamClusterPeers() []string { js := s.getJetStream() if js == nil { return nil } js.mu.RLock() defer js.mu.RUnlock() cc := js.cluster if !cc.isLeader() { return nil } peers := cc.meta.Peers() var nodes []string for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { continue } ni := si.(nodeInfo) // Ignore if offline, no JS, or no current stats have been received. if ni.offline || !ni.js || ni.stats == nil { continue } nodes = append(nodes, si.(nodeInfo).name) } return nodes } // Read lock should be held. func (cc *jetStreamCluster) isLeader() bool { if cc == nil { // Non-clustered mode return true } return cc.meta != nil && cc.meta.Leader() } // isCurrent will determine if this node is a leader or an up to date follower. // Read lock should be held. func (cc *jetStreamCluster) isCurrent() bool { if cc == nil { // Non-clustered mode return true } if cc.meta == nil { return false } return cc.meta.Current() } // isStreamCurrent will determine if the stream is up to date. // For R1 it will make sure the stream is present on this server. // Read lock should be held. func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool { if cc == nil { // Non-clustered mode return true } as := cc.streams[account] if as == nil { return false } sa := as[stream] if sa == nil { return false } rg := sa.Group if rg == nil { return false } if rg.node == nil || rg.node.Current() { // Check if we are processing a snapshot and are catching up. acc, err := cc.s.LookupAccount(account) if err != nil { return false } mset, err := acc.lookupStream(stream) if err != nil { return false } if mset.isCatchingUp() { return false } // Success. return true } return false } // isStreamHealthy will determine if the stream is up to date or very close. // For R1 it will make sure the stream is present on this server. // Read lock should be held. func (cc *jetStreamCluster) isStreamHealthy(account, stream string) bool { if cc == nil { // Non-clustered mode return true } as := cc.streams[account] if as == nil { return false } sa := as[stream] if sa == nil { return false } rg := sa.Group if rg == nil { return false } if rg.node == nil || rg.node.Healthy() { // Check if we are processing a snapshot and are catching up. acc, err := cc.s.LookupAccount(account) if err != nil { return false } mset, err := acc.lookupStream(stream) if err != nil { return false } if mset.isCatchingUp() { return false } // Success. return true } return false } // isConsumerCurrent will determine if the consumer is up to date. // For R1 it will make sure the consunmer is present on this server. // Read lock should be held. func (cc *jetStreamCluster) isConsumerCurrent(account, stream, consumer string) bool { if cc == nil { // Non-clustered mode return true } acc, err := cc.s.LookupAccount(account) if err != nil { return false } mset, err := acc.lookupStream(stream) if err != nil { return false } o := mset.lookupConsumer(consumer) if o == nil { return false } if n := o.raftNode(); n != nil && !n.Current() { return false } return true } // subjectsOverlap checks all existing stream assignments for the account cross-cluster for subject overlap // Use only for clustered JetStream // Read lock should be held. func (jsc *jetStreamCluster) subjectsOverlap(acc string, subjects []string, osa *streamAssignment) bool { asa := jsc.streams[acc] for _, sa := range asa { // can"t overlap yourself, assume osa pre-checked for deep equal if passed if osa != nil && sa == osa { continue } for _, subj := range sa.Config.Subjects { for _, tsubj := range subjects { if SubjectsCollide(tsubj, subj) { return true } } } } return false } func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) { a.mu.RLock() jsa := a.js a.mu.RUnlock() if jsa == nil { return nil, nil, nil } jsa.mu.RLock() js := jsa.js jsa.mu.RUnlock() if js == nil { return nil, nil, nil } js.mu.RLock() s := js.srv js.mu.RUnlock() return s, js, jsa } func (s *Server) JetStreamIsStreamLeader(account, stream string) bool { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return cc.isStreamLeader(account, stream) } func (a *Account) JetStreamIsStreamLeader(stream string) bool { s, js, jsa := a.getJetStreamFromAccount() if s == nil || js == nil || jsa == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return js.cluster.isStreamLeader(a.Name, stream) } func (s *Server) JetStreamIsStreamCurrent(account, stream string) bool { js, cc := s.getJetStreamCluster() if js == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return cc.isStreamCurrent(account, stream) } func (a *Account) JetStreamIsConsumerLeader(stream, consumer string) bool { s, js, jsa := a.getJetStreamFromAccount() if s == nil || js == nil || jsa == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return js.cluster.isConsumerLeader(a.Name, stream, consumer) } func (s *Server) JetStreamIsConsumerLeader(account, stream, consumer string) bool { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return cc.isConsumerLeader(account, stream, consumer) } func (s *Server) enableJetStreamClustering() error { if !s.isRunning() { return nil } js := s.getJetStream() if js == nil { return NewJSNotEnabledForAccountError() } // Already set. if js.cluster != nil { return nil } s.Noticef("Starting JetStream cluster") // We need to determine if we have a stable cluster name and expected number of servers. s.Debugf("JetStream cluster checking for stable cluster name and peers") hasLeafNodeSystemShare := s.canExtendOtherDomain() if s.isClusterNameDynamic() && !hasLeafNodeSystemShare { return errors.New("JetStream cluster requires cluster name") } if s.configuredRoutes() == 0 && !hasLeafNodeSystemShare { return errors.New("JetStream cluster requires configured routes or solicited leafnode for the system account") } return js.setupMetaGroup() } // isClustered returns if we are clustered. // Lock should not be held. func (js *jetStream) isClustered() bool { // This is only ever set, no need for lock here. return js.cluster != nil } // isClusteredNoLock returns if we are clustered, but unlike isClustered() does // not use the jetstream"s lock, instead, uses an atomic operation. // There are situations where some code wants to know if we are clustered but // can"t use js.isClustered() without causing a lock inversion. func (js *jetStream) isClusteredNoLock() bool { return atomic.LoadInt32(&js.clustered) == 1 } func (js *jetStream) setupMetaGroup() error { s := js.srv s.Noticef("Creating JetStream metadata controller") // Setup our WAL for the metagroup. sysAcc := s.SystemAccount() storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) fs, err := newFileStoreWithCreated( FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false}, StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(defaultMetaGroupName), ) if err != nil { s.Errorf("Error creating filestore: %v", err) return err } cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, // we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own. cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend var bootstrap bool if ps, err := readPeerState(storeDir); err != nil { s.Noticef("JetStream cluster bootstrapping") bootstrap = true peers := s.ActivePeers() s.Debugf("JetStream cluster initial peers: %+v", peers) if err := s.bootstrapRaftNode(cfg, peers, false); err != nil { return err } if cfg.Observer { s.Noticef("Turning JetStream metadata controller Observer Mode on") } } else { s.Noticef("JetStream cluster recovering state") // correlate the value of observer with observations from a previous run. if cfg.Observer { switch ps.domainExt { case extExtended: s.Noticef("Keeping JetStream metadata controller Observer Mode on - due to previous contact") case extNotExtended: s.Noticef("Turning JetStream metadata controller Observer Mode off - due to previous contact") cfg.Observer = false case extUndetermined: s.Noticef("Turning JetStream metadata controller Observer Mode on - no previous contact") s.Noticef("In cases where JetStream will not be extended") s.Noticef("and waiting for leader election until first contact is not acceptable,") s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) } } else { // To track possible configuration changes, responsible for an altered value of cfg.Observer, // set extension state to undetermined. ps.domainExt = extUndetermined if err := writePeerState(storeDir, ps); err != nil { return err } } } // Start up our meta node. n, err := s.startRaftNode(sysAcc.GetName(), cfg) if err != nil { s.Warnf("Could not start metadata controller: %v", err) return err } // If we are bootstrapped with no state, start campaign early. if bootstrap { n.Campaign() } c := s.createInternalJetStreamClient() sacc := s.SystemAccount() js.mu.Lock() defer js.mu.Unlock() js.cluster = &jetStreamCluster{ meta: n, streams: make(map[string]map[string]*streamAssignment), s: s, c: c, qch: make(chan struct{}), } atomic.StoreInt32(&js.clustered, 1) c.registerWithAccount(sacc) js.srv.startGoRoutine(js.monitorCluster) return nil } func (js *jetStream) getMetaGroup() RaftNode { js.mu.RLock() defer js.mu.RUnlock() if js.cluster == nil { return nil } return js.cluster.meta } func (js *jetStream) server() *Server { js.mu.RLock() s := js.srv js.mu.RUnlock() return s } // Will respond if we do not think we have a metacontroller leader. func (js *jetStream) isLeaderless() bool { js.mu.RLock() defer js.mu.RUnlock() cc := js.cluster if cc == nil || cc.meta == nil { return false } // If we don"t have a leader. // Make sure we have been running for enough time. if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault { return true } return false } // Will respond iff we are a member and we know we have no leader. func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool { if rg == nil || js == nil { return false } js.mu.RLock() defer js.mu.RUnlock() cc := js.cluster // If we are not a member we can not say.. if cc.meta == nil { return false } if !rg.isMember(cc.meta.ID()) { return false } // Single peer groups always have a leader if we are here. if rg.node == nil { return false } // If we don"t have a leader. if rg.node.GroupLeader() == _EMPTY_ { // Threshold for jetstream startup. const startupThreshold = 5 * time.Second if rg.node.HadPreviousLeader() { // Make sure we have been running long enough to intelligently determine this. if time.Since(js.started) > startupThreshold { return true } } // Make sure we have been running for enough time. if time.Since(rg.node.Created()) > lostQuorumIntervalDefault { return true } } return false } func (s *Server) JetStreamIsStreamAssigned(account, stream string) bool { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return false } acc, _ := s.LookupAccount(account) if acc == nil { return false } js.mu.RLock() assigned := cc.isStreamAssigned(acc, stream) js.mu.RUnlock() return assigned } // streamAssigned informs us if this server has this stream assigned. func (jsa *jsAccount) streamAssigned(stream string) bool { jsa.mu.RLock() js, acc := jsa.js, jsa.account jsa.mu.RUnlock() if js == nil { return false } js.mu.RLock() assigned := js.cluster.isStreamAssigned(acc, stream) js.mu.RUnlock() return assigned } // Read lock should be held. func (cc *jetStreamCluster) isStreamAssigned(a *Account, stream string) bool { // Non-clustered mode always return true. if cc == nil { return true } if cc.meta == nil { return false } as := cc.streams[a.Name] if as == nil { return false } sa := as[stream] if sa == nil { return false } rg := sa.Group if rg == nil { return false } // Check if we are the leader of this raftGroup assigned to the stream. ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { return true } } return false } // Read lock should be held. func (cc *jetStreamCluster) isStreamLeader(account, stream string) bool { // Non-clustered mode always return true. if cc == nil { return true } if cc.meta == nil { return false } var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] } if sa == nil { return false } rg := sa.Group if rg == nil { return false } // Check if we are the leader of this raftGroup assigned to the stream. ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { if len(rg.Peers) == 1 || rg.node != nil && rg.node.Leader() { return true } } } return false } // Read lock should be held. func (cc *jetStreamCluster) isConsumerLeader(account, stream, consumer string) bool { // Non-clustered mode always return true. if cc == nil { return true } if cc.meta == nil { return false } var sa *streamAssignment if as := cc.streams[account]; as != nil { sa = as[stream] } if sa == nil { return false } // Check if we are the leader of this raftGroup assigned to this consumer. ca := sa.consumers[consumer] if ca == nil { return false } rg := ca.Group ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { if len(rg.Peers) == 1 || (rg.node != nil && rg.node.Leader()) { return true } } } return false } // Remove the stream `streamName` for the account `accName` from the inflight // proposals map. This is done on success (processStreamAssignment) or on // failure (processStreamAssignmentResults). // (Write) Lock held on entry. func (cc *jetStreamCluster) removeInflightProposal(accName, streamName string) { streams, ok := cc.inflight[accName] if !ok { return } delete(streams, streamName) if len(streams) == 0 { delete(cc.inflight, accName) } } // Return the cluster quit chan. func (js *jetStream) clusterQuitC() chan struct{} { js.mu.RLock() defer js.mu.RUnlock() if js.cluster != nil { return js.cluster.qch } return nil } // Mark that the meta layer is recovering. func (js *jetStream) setMetaRecovering() { js.mu.Lock() defer js.mu.Unlock() if js.cluster != nil { // metaRecovering js.metaRecovering = true } } // Mark that the meta layer is no longer recovering. func (js *jetStream) clearMetaRecovering() { js.mu.Lock() defer js.mu.Unlock() js.metaRecovering = false } // Return whether the meta layer is recovering. func (js *jetStream) isMetaRecovering() bool { js.mu.RLock() defer js.mu.RUnlock() return js.metaRecovering } // During recovery track any stream and consumer delete and update operations. type recoveryUpdates struct { removeStreams map[string]*streamAssignment removeConsumers map[string]*consumerAssignment updateStreams map[string]*streamAssignment updateConsumers map[string]*consumerAssignment } // Called after recovery of the cluster on startup to check for any orphans. // Streams and consumers are recovered from disk, and the meta layer"s mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { consumerName := func(o *consumer) string { o.mu.RLock() defer o.mu.RUnlock() return o.name } // Can not hold jetstream lock while trying to delete streams or consumers. js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") var streams []*stream var consumers []*consumer for accName, jsa := range js.accounts { asa := cc.streams[accName] for stream, mset := range jsa.streams { if sa := asa[stream]; sa == nil { streams = append(streams, mset) } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { consumer := consumerName(o) if sa.consumers[consumer] == nil { consumers = append(consumers, o) } } } } } js.mu.Unlock() for _, mset := range streams { mset.mu.RLock() accName, stream := mset.acc.Name, mset.cfg.Name mset.mu.RUnlock() s.Warnf("Detected orphaned stream "%s > %s", will cleanup", accName, stream) if err := mset.delete(); err != nil { s.Warnf("Deleting stream encountered an error: %v", err) } } for _, o := range consumers { o.mu.RLock() accName, mset, consumer := o.acc.Name, o.mset, o.name o.mu.RUnlock() stream := "N/A" if mset != nil { mset.mu.RLock() stream = mset.cfg.Name mset.mu.RUnlock() } s.Warnf("Detected orphaned consumer "%s > %s > %s", will cleanup", accName, stream, consumer) if err := o.delete(); err != nil { s.Warnf("Deleting consumer encountered an error: %v", err) } } } func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() defer s.grWG.Done() s.Debugf("Starting metadata monitor") defer s.Debugf("Exiting metadata monitor") // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() const compactInterval = 2 * time.Minute t := time.NewTicker(compactInterval) defer t.Stop() // Used to check cold boot cluster when possibly in mixed mode. const leaderCheckInterval = time.Second lt := time.NewTicker(leaderCheckInterval) defer lt.Stop() var ( isLeader bool lastSnap []byte lastSnapTime time.Time minSnapDelta = 10 * time.Second ) // Highwayhash key for generating hashes. key := make([]byte, 32) rand.Read(key) // Set to true to start. js.setMetaRecovering() // Snapshotting function. doSnapshot := func() { // Suppress during recovery. if js.isMetaRecovering() { return } snap := js.metaSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } } ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), removeConsumers: make(map[string]*consumerAssignment), updateStreams: make(map[string]*streamAssignment), updateConsumers: make(map[string]*consumerAssignment), } for { select { case <-s.quitCh: return case <-rqch: return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. doSnapshot() // Return the signal back since shutdown will be waiting. close(qch) return case <-aq.ch: ces := aq.pop() for _, ce := range ces { if ce == nil { // Signals we have replayed all of our metadata. js.clearMetaRecovering() // Process any removes that are still valid after recovery. for _, ca := range ru.removeConsumers { js.processConsumerRemoval(ca) } for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) } // Process pending updates. for _, sa := range ru.updateStreams { js.processUpdateStreamAssignment(sa) } // Now consumers. for _, ca := range ru.updateConsumers { js.processConsumerAssignment(ca) } // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") js.checkForOrphans() continue } // FIXME(dlc) - Deal with errors. if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() } else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 { doSnapshot() } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } } } aq.recycle(&ces) case isLeader = <-lch: js.processLeaderChange(isLeader) if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) // Install a snapshot as we become leader. js.checkClusterSize() doSnapshot() } case <-t.C: doSnapshot() // Periodically check the cluster size. if n.Leader() { js.checkClusterSize() } case <-lt.C: s.Debugf("Checking JetStream cluster state") // If we have a current leader or had one in the past we can cancel this here since the metaleader // will be in charge of all peer state changes. // For cold boot only. if n.GroupLeader() != _EMPTY_ || n.HadPreviousLeader() { lt.Stop() continue } // If we are here we do not have a leader and we did not have a previous one, so cold start. // Check to see if we can adjust our cluster size down iff we are in mixed mode and we have // seen a total that is what our original estimate was. cs := n.ClusterSize() if js, total := s.trackedJetStreamServers(); js < total && total >= cs && js != cs { s.Noticef("Adjusting JetStream expected peer set size to %d from original %d", js, cs) n.AdjustBootClusterSize(js) } } } } // This is called on first leader transition to double check the peers and cluster set size. func (js *jetStream) checkClusterSize() { s, n := js.server(), js.getMetaGroup() if n == nil { return } // We will check that we have a correct cluster set size by checking for any non-js servers // which can happen in mixed mode. ps := n.(*raft).currentPeerState() if len(ps.knownPeers) >= ps.clusterSize { return } // Grab our active peers. peers := s.ActivePeers() // If we have not registered all of our peers yet we can"t do // any adjustments based on a mixed mode. We will periodically check back. if len(peers) < ps.clusterSize { return } s.Debugf("Checking JetStream cluster size") // If we are here our known set as the leader is not the same as the cluster size. // Check to see if we have a mixed mode setup. var totalJS int for _, p := range peers { if si, ok := s.nodeToInfo.Load(p); ok && si != nil { if si.(nodeInfo).js { totalJS++ } } } // If we have less then our cluster size adjust that here. Can not do individual peer removals since // they will not be in the tracked peers. if totalJS < ps.clusterSize { s.Debugf("Adjusting JetStream cluster size from %d to %d", ps.clusterSize, totalJS) if err := n.AdjustClusterSize(totalJS); err != nil { s.Warnf("Error adjusting JetStream cluster size: %v", err) } } } // Represents our stable meta state that we can write out. type writeableStreamAssignment struct { Client *ClientInfo `json:"client,omitempty"` Created time.Time `json:"created"` Config *StreamConfig `json:"stream"` Group *raftGroup `json:"group"` Sync string `json:"sync"` Consumers []*consumerAssignment } func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConfig, bool) { js.mu.RLock() defer js.mu.RUnlock() if sa, ok := js.cluster.streams[accName][streamName]; ok { return *sa.Config, true } return StreamConfig{}, false } func (js *jetStream) metaSnapshot() []byte { var streams []writeableStreamAssignment js.mu.RLock() cc := js.cluster for _, asa := range cc.streams { for _, sa := range asa { wsa := writeableStreamAssignment{ Client: sa.Client, Created: sa.Created, Config: sa.Config, Group: sa.Group, Sync: sa.Sync, } for _, ca := range sa.consumers { wsa.Consumers = append(wsa.Consumers, ca) } streams = append(streams, wsa) } } if len(streams) == 0 { js.mu.RUnlock() return nil } b, _ := json.Marshal(streams) js.mu.RUnlock() return s2.EncodeBetter(nil, b) } func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { var wsas []writeableStreamAssignment if len(buf) > 0 { jse, err := s2.Decode(nil, buf) if err != nil { return err } if err = json.Unmarshal(jse, &wsas); err != nil { return err } } // Build our new version here outside of js. streams := make(map[string]map[string]*streamAssignment) for _, wsa := range wsas { fixCfgMirrorWithDedupWindow(wsa.Config) as := streams[wsa.Client.serviceAccount()] if as == nil { as = make(map[string]*streamAssignment) streams[wsa.Client.serviceAccount()] = as } sa := &streamAssignment{Client: wsa.Client, Created: wsa.Created, Config: wsa.Config, Group: wsa.Group, Sync: wsa.Sync} if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) for _, ca := range wsa.Consumers { sa.consumers[ca.Name] = ca } } as[wsa.Config.Name] = sa } js.mu.Lock() cc := js.cluster var saAdd, saDel, saChk []*streamAssignment // Walk through the old list to generate the delete list. for account, asa := range cc.streams { nasa := streams[account] for sn, sa := range asa { if nsa := nasa[sn]; nsa == nil { saDel = append(saDel, sa) } else { saChk = append(saChk, nsa) } } } // Walk through the new list to generate the add list. for account, nasa := range streams { asa := cc.streams[account] for sn, sa := range nasa { if asa[sn] == nil { saAdd = append(saAdd, sa) } } } // Now walk the ones to check and process consumers. var caAdd, caDel []*consumerAssignment for _, sa := range saChk { // Make sure to add in all the new ones from sa. for _, ca := range sa.consumers { caAdd = append(caAdd, ca) } if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { if sa.consumers[ca.Name] == nil { caDel = append(caDel, ca) } else { caAdd = append(caAdd, ca) } } } } js.mu.Unlock() // Do removals first. for _, sa := range saDel { js.setStreamAssignmentRecovering(sa) if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) } } // Now do add for the streams. Also add in all consumers. for _, sa := range saAdd { js.setStreamAssignmentRecovering(sa) js.processStreamAssignment(sa) // We can simply process the consumers. for _, ca := range sa.consumers { js.setConsumerAssignmentRecovering(ca) js.processConsumerAssignment(ca) } } // Perform updates on those in saChk. These were existing so make // sure to process any changes. for _, sa := range saChk { js.setStreamAssignmentRecovering(sa) if isRecovering { key := sa.recoveryKey() ru.updateStreams[key] = sa delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) } } // Now do the deltas for existing stream"s consumers. for _, ca := range caDel { js.setConsumerAssignmentRecovering(ca) if isRecovering { key := ca.recoveryKey() ru.removeConsumers[key] = ca delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) } } for _, ca := range caAdd { js.setConsumerAssignmentRecovering(ca) if isRecovering { key := ca.recoveryKey() delete(ru.removeConsumers, key) ru.updateConsumers[key] = ca } else { js.processConsumerAssignment(ca) } } return nil } // Called on recovery to make sure we do not process like original. func (js *jetStream) setStreamAssignmentRecovering(sa *streamAssignment) { js.mu.Lock() defer js.mu.Unlock() sa.responded = true sa.recovering = true sa.Restore = nil if sa.Group != nil { sa.Group.Preferred = _EMPTY_ } } // Called on recovery to make sure we do not process like original. func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) { js.mu.Lock() defer js.mu.Unlock() ca.responded = true ca.recovering = true if ca.Group != nil { ca.Group.Preferred = _EMPTY_ } } // Just copies over and changes out the group so it can be encoded. // Lock should be held. func (sa *streamAssignment) copyGroup() *streamAssignment { csa, cg := *sa, *sa.Group csa.Group = &cg csa.Group.Peers = copyStrings(sa.Group.Peers) return &csa } // Just copies over and changes out the group so it can be encoded. // Lock should be held. func (ca *consumerAssignment) copyGroup() *consumerAssignment { cca, cg := *ca, *ca.Group cca.Group = &cg cca.Group.Peers = copyStrings(ca.Group.Peers) return &cca } // Lock should be held. func (sa *streamAssignment) missingPeers() bool { return len(sa.Group.Peers) < sa.Config.Replicas } // Called when we detect a new peer. Only the leader will process checking // for any streams, and consequently any consumers. func (js *jetStream) processAddPeer(peer string) { js.mu.Lock() defer js.mu.Unlock() s, cc := js.srv, js.cluster isLeader := cc.isLeader() // Now check if we are meta-leader. We will check for any re-assignments. if !isLeader { return } sir, ok := s.nodeToInfo.Load(peer) if !ok || sir == nil { return } si := sir.(nodeInfo) for _, asa := range cc.streams { for _, sa := range asa { if sa.missingPeers() { // Make sure the right cluster etc. if si.cluster != sa.Client.Cluster { continue } // If we are here we can add in this peer. csa := sa.copyGroup() csa.Group.Peers = append(csa.Group.Peers, peer) // Send our proposal for this csa. Also use same group definition for all the consumers as well. cc.meta.Propose(encodeAddStreamAssignment(csa)) for _, ca := range sa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1. if ca.Config.Durable != _EMPTY_ || len(ca.Group.Peers) > 1 { cca := ca.copyGroup() cca.Group.Peers = csa.Group.Peers cc.meta.Propose(encodeAddConsumerAssignment(cca)) } } } } } } func (js *jetStream) processRemovePeer(peer string) { js.mu.Lock() s, cc := js.srv, js.cluster if cc.meta == nil { js.mu.Unlock() return } isLeader := cc.isLeader() // All nodes will check if this is them. isUs := cc.meta.ID() == peer disabled := js.disabled js.mu.Unlock() // We may be already disabled. if disabled { return } if isUs { s.Errorf("JetStream being DISABLED, our server was removed from the cluster") adv := &JSServerRemovedAdvisory{ TypedEvent: TypedEvent{ Type: JSServerRemovedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Server: s.Name(), ServerID: s.ID(), Cluster: s.cachedClusterName(), Domain: s.getOpts().JetStreamDomain, } s.publishAdvisory(nil, JSAdvisoryServerRemoved, adv) go s.DisableJetStream() } // Now check if we are meta-leader. We will attempt re-assignment. if !isLeader { return } js.mu.Lock() defer js.mu.Unlock() for _, asa := range cc.streams { for _, sa := range asa { if rg := sa.Group; rg.isMember(peer) { js.removePeerFromStreamLocked(sa, peer) } } } } // Assumes all checks have already been done. func (js *jetStream) removePeerFromStream(sa *streamAssignment, peer string) bool { js.mu.Lock() defer js.mu.Unlock() return js.removePeerFromStreamLocked(sa, peer) } // Lock should be held. func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer string) bool { if rg := sa.Group; !rg.isMember(peer) { return false } s, cc, csa := js.srv, js.cluster, sa.copyGroup() replaced := cc.remapStreamAssignment(csa, peer) if !replaced { s.Warnf("JetStream cluster could not replace peer for stream "%s > %s"", sa.Client.serviceAccount(), sa.Config.Name) } // Send our proposal for this csa. Also use same group definition for all the consumers as well. cc.meta.Propose(encodeAddStreamAssignment(csa)) rg := csa.Group for _, ca := range sa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1. if ca.Config.Durable != _EMPTY_ { cca := ca.copyGroup() cca.Group.Peers, cca.Group.Preferred = rg.Peers, _EMPTY_ cc.meta.Propose(encodeAddConsumerAssignment(cca)) } else if ca.Group.isMember(peer) { // These are ephemerals. Check to see if we deleted this peer. cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) } } return replaced } // Check if we have peer related entries. func (js *jetStream) hasPeerEntries(entries []*Entry) bool { for _, e := range entries { if e.Type == EntryRemovePeer || e.Type == EntryAddPeer { return true } } return false } const ksep = ":" func (sa *streamAssignment) recoveryKey() string { if sa == nil { return _EMPTY_ } return sa.Client.serviceAccount() + ksep + sa.Config.Name } func (ca *consumerAssignment) recoveryKey() string { if ca == nil { return _EMPTY_ } return ca.Client.serviceAccount() + ksep + ca.Stream + ksep + ca.Name } func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bool, bool, bool, error) { var didSnap, didRemoveStream, didRemoveConsumer bool isRecovering := js.isMetaRecovering() for _, e := range entries { if e.Type == EntrySnapshot { js.applyMetaSnapshot(e.Data, ru, isRecovering) didSnap = true } else if e.Type == EntryRemovePeer { if !isRecovering { js.processRemovePeer(string(e.Data)) } } else if e.Type == EntryAddPeer { if !isRecovering { js.processAddPeer(string(e.Data)) } } else { buf := e.Data switch entryOp(buf[0]) { case assignStreamOp: sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) delete(ru.removeStreams, sa.recoveryKey()) } if js.processStreamAssignment(sa) { didRemoveStream = true } case removeStreamOp: sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.removeStreams[key] = sa delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) didRemoveStream = true } case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() delete(ru.removeConsumers, key) ru.updateConsumers[key] = ca } else { js.processConsumerAssignment(ca) } case assignCompressedConsumerOp: ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() delete(ru.removeConsumers, key) ru.updateConsumers[key] = ca } else { js.processConsumerAssignment(ca) } case removeConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() ru.removeConsumers[key] = ca delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) didRemoveConsumer = true } case updateStreamOp: sa, err := decodeStreamAssignment(buf[1:]) if err != nil { js.srv.Errorf("JetStream cluster failed to decode stream assignment: %q", buf[1:]) return didSnap, didRemoveStream, didRemoveConsumer, err } if isRecovering { js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.updateStreams[key] = sa delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) } default: panic("JetStream Cluster Unknown meta entry op type") } } } return didSnap, didRemoveStream, didRemoveConsumer, nil } func (rg *raftGroup) isMember(id string) bool { if rg == nil { return false } for _, peer := range rg.Peers { if peer == id { return true } } return false } func (rg *raftGroup) setPreferred() { if rg == nil || len(rg.Peers) == 0 { return } if len(rg.Peers) == 1 { rg.Preferred = rg.Peers[0] } else { // For now just randomly select a peer for the preferred. pi := rand.Int31n(int32(len(rg.Peers))) rg.Preferred = rg.Peers[pi] } } // createRaftGroup is called to spin up this raft group if needed. func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage StorageType) error { js.mu.Lock() s, cc := js.srv, js.cluster if cc == nil || cc.meta == nil { js.mu.Unlock() return NewJSClusterNotActiveError() } // If this is a single peer raft group or we are not a member return. if len(rg.Peers) <= 1 || !rg.isMember(cc.meta.ID()) { js.mu.Unlock() // Nothing to do here. return nil } // Check if we already have this assigned. if node := s.lookupRaftNode(rg.Name); node != nil { s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name) rg.node = node js.mu.Unlock() return nil } js.mu.Unlock() s.Debugf("JetStream cluster creating raft group:%+v", rg) sysAcc := s.SystemAccount() if sysAcc == nil { s.Debugf("JetStream cluster detected shutdown processing raft group: %+v", rg) return errors.New("shutting down") } // Check here to see if we have a max HA Assets limit set. if maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets; maxHaAssets > 0 { if s.numRaftNodes() > maxHaAssets { s.Warnf("Maximum HA Assets limit reached: %d", maxHaAssets) // Since the meta leader assigned this, send a statsz update to them to get them up to date. go s.sendStatszUpdate() return errors.New("system limit reached") } } storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) var store StreamStore if storage == FileStorage { fs, err := newFileStoreWithCreated( FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, StreamConfig{Name: rg.Name, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(rg.Name), ) if err != nil { s.Errorf("Error creating filestore WAL: %v", err) return err } store = fs } else { ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage}) if err != nil { s.Errorf("Error creating memstore WAL: %v", err) return err } store = ms } cfg := &RaftConfig{Name: rg.Name, Store: storeDir, Log: store, Track: true} if _, err := readPeerState(storeDir); err != nil { s.bootstrapRaftNode(cfg, rg.Peers, true) } n, err := s.startRaftNode(accName, cfg) if err != nil || n == nil { s.Debugf("Error creating raft group: %v", err) return err } // Need locking here for the assignment to avoid data-race reports js.mu.Lock() rg.node = n // See if we are preferred and should start campaign immediately. if n.ID() == rg.Preferred && n.Term() == 0 { n.Campaign() } js.mu.Unlock() return nil } func (mset *stream) raftGroup() *raftGroup { if mset == nil { return nil } mset.mu.RLock() defer mset.mu.RUnlock() if mset.sa == nil { return nil } return mset.sa.Group } func (mset *stream) raftNode() RaftNode { if mset == nil { return nil } mset.mu.RLock() defer mset.mu.RUnlock() return mset.node } func (mset *stream) removeNode() { mset.mu.Lock() defer mset.mu.Unlock() if n := mset.node; n != nil { n.Delete() mset.node = nil } } // Helper function to generate peer info. // lists and sets for old and new. func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) { newPeers = peers[split:] oldPeers = peers[:split] newPeerSet = make(map[string]bool, len(newPeers)) oldPeerSet = make(map[string]bool, len(oldPeers)) for i, peer := range peers { if i < split { oldPeerSet[peer] = true } else { newPeerSet[peer] = true } } return } // Monitor our stream node for this stream. func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnapshot bool) { s, cc := js.server(), js.cluster defer s.grWG.Done() if mset != nil { defer mset.monitorWg.Done() } js.mu.RLock() n := sa.Group.node meta := cc.meta js.mu.RUnlock() if n == nil || meta == nil { s.Warnf("No RAFT group for "%s > %s"", sa.Client.serviceAccount(), sa.Config.Name) return } qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() s.Debugf("Starting stream monitor for "%s > %s" [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) defer s.Debugf("Exiting stream monitor for "%s > %s" [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() // Make sure we do not leave the apply channel to fill up and block the raft layer. defer func() { if n.State() == Closed { return } if n.Leader() { n.StepDown() } // Drain the commit queue... aq.drain() }() const ( compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. rci := time.Duration(rand.Int63n(int64(time.Minute))) t := time.NewTicker(compactInterval + rci) defer t.Stop() js.mu.RLock() isLeader := cc.isStreamLeader(sa.Client.serviceAccount(), sa.Config.Name) isRestore := sa.Restore != nil js.mu.RUnlock() acc, err := s.LookupAccount(sa.Client.serviceAccount()) if err != nil { s.Warnf("Could not retrieve account for stream "%s > %s"", sa.Client.serviceAccount(), sa.Config.Name) return } accName := acc.GetName() // Hash of the last snapshot (fixed size in memory). var lastSnap []byte var lastSnapTime time.Time // Highwayhash key for generating hashes. key := make([]byte, 32) rand.Read(key) // Should only to be called from leader. doSnapshot := func() { if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { return } snap := mset.stateSnapshot() ne, nb := n.Size() hash := highwayhash.Sum(snap, key) // If the state hasn"t changed but the log has gone way over // the compaction size then we will want to compact anyway. // This shouldn"t happen for streams like it can for pull // consumers on idle streams but better to be safe than sorry! if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for "%s > %s" [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } } // We will establish a restoreDoneCh no matter what. Will never be triggered unless // we replace with the restore chan. restoreDoneCh := make(<-chan error) isRecovering := true // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() mmt, mmtc = nil, nil } } defer stopMigrationMonitoring() // This is to optionally track when we are ready as a non-leader for direct access participation. // Either direct or if we are a direct mirror, or both. var dat *time.Ticker var datc <-chan time.Time startDirectAccessMonitoring := func() { if dat == nil { dat = time.NewTicker(1 * time.Second) datc = dat.C } } stopDirectMonitoring := func() { if dat != nil { dat.Stop() dat, datc = nil, nil } } defer stopDirectMonitoring() // This is triggered during a scale up from R1 to clustered mode. We need the new followers to catchup, // similar to how we trigger the catchup mechanism post a backup/restore. // We can arrive here NOT being the leader, so we send the snapshot only if we are, and in this case // reset the notion that we need to send the snapshot. If we are not, then the first time the server // will switch to leader (in the loop below), we will send the snapshot. if sendSnapshot && isLeader && mset != nil && n != nil { n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } for { select { case <-s.quitCh: return case <-qch: return case <-aq.ch: var ne, nb uint64 ces := aq.pop() for _, ce := range ces { // No special processing needed for when we are caught up on restart. if ce == nil { isRecovering = false // Check on startup if we should snapshot/compact. if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() { doSnapshot() } continue } // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { // Update our applied. ne, nb = n.Applied(ce.Index) } else { s.Warnf("Error applying entries to "%s > %s": %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { if mset.isMirror() && mset.IsLeader() { mset.retryMirrorConsumer() continue } // We will attempt to reset our cluster state. if mset.resetClusteredState(err) { aq.recycle(&ces) return } } else if isOutOfSpaceErr(err) { // If applicable this will tear all of this down, but don"t assume so and return. s.handleOutOfSpace(mset) } } } aq.recycle(&ces) // Check about snapshotting // If we have at least min entries to compact, go ahead and try to snapshot/compact. if ne >= compactNumMin || nb > compactSizeMin { doSnapshot() } case isLeader = <-lch: if isLeader { if sendSnapshot && mset != nil && n != nil { n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false } if isRestore { acc, _ := s.LookupAccount(sa.Client.serviceAccount()) restoreDoneCh = s.processStreamRestore(sa.Client, acc, sa.Config, _EMPTY_, sa.Reply, _EMPTY_) continue } else if n.NeedSnapshot() { doSnapshot() } // Always cancel if this was running. stopDirectMonitoring() } else if n.GroupLeader() != noLeader { js.setStreamAssignmentRecovering(sa) } // Process our leader change. js.processStreamLeaderChange(mset, isLeader) // We may receive a leader change after the stream assignment which would cancel us // monitoring for this closely. So re-assess our state here as well. // Or the old leader is no longer part of the set and transferred leadership // for this leader to resume with removal migrating := mset.isMigrating() // Check for migrations here. We set the state on the stream assignment update below. if isLeader && migrating { startMigrationMonitoring() } // Here we are checking if we are not the leader but we have been asked to allow // direct access. We now allow non-leaders to participate in the queue group. if !isLeader && mset != nil { mset.mu.Lock() // Check direct gets first. if mset.cfg.AllowDirect { if mset.directSub == nil && mset.isCurrent() { mset.subscribeToDirect() } else { startDirectAccessMonitoring() } } // Now check for mirror directs as well. if mset.cfg.MirrorDirect { if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() { mset.subscribeToMirrorDirect() } else { startDirectAccessMonitoring() } } mset.mu.Unlock() } case <-datc: mset.mu.Lock() ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent() if !current { const syncThreshold = 90.0 // We are not current, but current means exactly caught up. Under heavy publish // loads we may never reach this, so check if we are within 90% caught up. _, c, a := mset.node.Progress() if p := float64(a) / float64(c) * 100.0; p < syncThreshold { mset.mu.Unlock() continue } else { s.Debugf("Stream "%s > %s" enabling direct gets at %.0f%% synchronized", sa.Client.serviceAccount(), sa.Config.Name, p) } } // We are current, cancel monitoring and create the direct subs as needed. if ad { mset.subscribeToDirect() } if md { mset.subscribeToMirrorDirect() } mset.mu.Unlock() // Stop monitoring. stopDirectMonitoring() case <-t.C: doSnapshot() // Check is we have preAcks left over if we have become the leader. if isLeader { mset.mu.Lock() if mset.preAcks != nil { mset.preAcks = nil } mset.mu.Unlock() } case <-uch: // keep stream assignment current sa = mset.streamAssignment() // keep peer list up to date with config js.checkPeers(mset.raftGroup()) // We get this when we have a new stream assignment caused by an update. // We want to know if we are migrating. if migrating := mset.isMigrating(); migrating { if isLeader && mmtc == nil { startMigrationMonitoring() } } else { stopMigrationMonitoring() } case <-mmtc: if !isLeader { // We are no longer leader, so not our job. stopMigrationMonitoring() continue } // Check to see where we are.. rg := mset.raftGroup() ci := js.clusterInfo(rg) mset.checkClusterInfo(ci) // Track the new peers and check the ones that are current. mset.mu.RLock() replicas := mset.cfg.Replicas mset.mu.RUnlock() if len(rg.Peers) <= replicas { // Migration no longer happening, so not our job anymore stopMigrationMonitoring() continue } newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) // If we are part of the new peerset and we have been passed the baton. // We will handle scale down. if newPeerSet[ourPeerId] { // First need to check on any consumers and make sure they have moved properly before scaling down ourselves. js.mu.RLock() var needToWait bool for name, c := range sa.consumers { for _, peer := range c.Group.Peers { // If we have peers still in the old set block. if oldPeerSet[peer] { s.Debugf("Scale down of "%s > %s" blocked by consumer "%s"", accName, sa.Config.Name, name) needToWait = true break } } if needToWait { break } } js.mu.RUnlock() if needToWait { continue } // We are good to go, can scale down here. for _, p := range oldPeers { n.ProposeRemovePeer(p) } csa := sa.copyGroup() csa.Group.Peers = newPeers csa.Group.Preferred = ourPeerId csa.Group.Cluster = s.cachedClusterName() cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa)) s.Noticef("Scaling down "%s > %s" to %+v", accName, sa.Config.Name, s.peerSetToNames(newPeers)) } else { // We are the old leader here, from the original peer set. // We are simply waiting on the new peerset to be caught up so we can transfer leadership. var newLeaderPeer, newLeader string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { if r.Current && newPeerSet[r.Peer] { current++ if newLeader == _EMPTY_ { newLeaderPeer, newLeader = r.Peer, r.Name } } } // Check if we have a quorom. if current >= neededCurrent { s.Noticef("Transfer of stream leader for "%s > %s" to "%s"", accName, sa.Config.Name, newLeader) n.StepDown(newLeaderPeer) } } case err := <-restoreDoneCh: // We have completed a restore from snapshot on this server. The stream assignment has // already been assigned but the replicas will need to catch up out of band. Consumers // will need to be assigned by forwarding the proposal and stamping the initial state. s.Debugf("Stream restore for "%s > %s" completed", sa.Client.serviceAccount(), sa.Config.Name) if err != nil { s.Debugf("Stream restore failed: %v", err) } isRestore = false sa.Restore = nil // If we were successful lookup up our stream now. if err == nil { if mset, err = acc.lookupStream(sa.Config.Name); mset != nil { mset.monitorWg.Add(1) defer mset.monitorWg.Done() mset.setStreamAssignment(sa) // Make sure to update our updateC which would have been nil. uch = mset.updateC() } } if err != nil { if mset != nil { mset.delete() } js.mu.Lock() sa.err = err if n != nil { n.Delete() } result := &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Restore: &JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}, } result.Restore.Error = NewJSStreamAssignmentError(err, Unless(err)) js.mu.Unlock() // Send response to the metadata leader. They will forward to the user as needed. s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result) return } if !isLeader { panic("Finished restore but not leader") } // Trigger the stream followers to catchup. if n = mset.raftNode(); n != nil { n.SendSnapshot(mset.stateSnapshot()) } js.processStreamLeaderChange(mset, isLeader) // Check to see if we have restored consumers here. // These are not currently assigned so we will need to do so here. if consumers := mset.getPublicConsumers(); len(consumers) > 0 { for _, o := range consumers { name, cfg := o.String(), o.config() rg := cc.createGroupForConsumer(&cfg, sa) // Pick a preferred leader. rg.setPreferred() // Place our initial state here as well for assignment distribution. state, _ := o.store.State() ca := &consumerAssignment{ Group: rg, Stream: sa.Config.Name, Name: name, Config: &cfg, Client: sa.Client, Created: o.createdTime(), State: state, } // We make these compressed in case state is complex. addEntry := encodeAddConsumerAssignmentCompressed(ca) cc.meta.ForwardProposal(addEntry) // Check to make sure we see the assignment. go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { js.mu.RLock() ca, meta := js.consumerAssignment(ca.Client.serviceAccount(), sa.Config.Name, name), cc.meta js.mu.RUnlock() if ca == nil { s.Warnf("Consumer assignment has not been assigned, retrying") if meta != nil { meta.ForwardProposal(addEntry) } else { return } } else { return } } }() } } } } } // Determine if we are migrating func (mset *stream) isMigrating() bool { if mset == nil { return false } mset.mu.RLock() js, sa := mset.js, mset.sa mset.mu.RUnlock() js.mu.RLock() defer js.mu.RUnlock() // During migration we will always be R>1, even when we start R1. // So if we do not have a group or node we no we are not migrating. if sa == nil || sa.Group == nil || sa.Group.node == nil { return false } // The sign of migration is if our group peer count != configured replica count. if sa.Config.Replicas == len(sa.Group.Peers) { return false } return true } // resetClusteredState is called when a clustered stream had a sequence mismatch and needs to be reset. func (mset *stream) resetClusteredState(err error) bool { mset.mu.RLock() s, js, jsa, sa, acc, node := mset.srv, mset.js, mset.jsa, mset.sa, mset.acc, mset.node stype, isLeader, tierName := mset.cfg.Storage, mset.isLeader(), mset.tier mset.mu.RUnlock() // Stepdown regardless if we are the leader here. if isLeader && node != nil { node.StepDown() } // Server if js.limitsExceeded(stype) { s.Debugf("Will not reset stream, server resources exceeded") return false } // Account if exceeded, _ := jsa.limitsExceeded(stype, tierName); exceeded { s.Warnf("stream "%s > %s" errored, account resources exceeded", acc, mset.name()) return false } // We delete our raft state. Will recreate. if node != nil { node.Delete() } // Preserve our current state and messages unless we have a first sequence mismatch. shouldDelete := err == errFirstSequenceMismatch mset.stop(shouldDelete, false) if sa != nil { s.Warnf("Resetting stream cluster state for "%s > %s"", sa.Client.serviceAccount(), sa.Config.Name) js.mu.Lock() sa.Group.node = nil js.mu.Unlock() go js.restartClustered(acc, sa) } return true } // This will reset the stream and consumers. // Should be done in separate go routine. func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) { // Check and collect consumers first. js.mu.RLock() var consumers []*consumerAssignment if cc := js.cluster; cc != nil && cc.meta != nil { ourID := cc.meta.ID() for _, ca := range sa.consumers { if rg := ca.Group; rg != nil && rg.isMember(ourID) { rg.node = nil // Erase group raft/node state. consumers = append(consumers, ca) } } } js.mu.RUnlock() // Reset stream. js.processClusterCreateStream(acc, sa) // Reset consumers. for _, ca := range consumers { js.processClusterCreateConsumer(ca, nil, false) } } func isControlHdr(hdr []byte) bool { return bytes.HasPrefix(hdr, []byte("NATS/1.0 100 ")) } // Apply our stream entries. func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isRecovering bool) error { for _, e := range ce.Entries { if e.Type == EntryNormal { buf, op := e.Data, entryOp(e.Data[0]) switch op { case streamMsgOp, compressedStreamMsgOp: if mset == nil { continue } s := js.srv mbuf := buf[1:] if op == compressedStreamMsgOp { var err error mbuf, err = s2.Decode(nil, mbuf) if err != nil { panic(err.Error()) } } subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(mbuf) if err != nil { if node := mset.raftNode(); node != nil { s.Errorf("JetStream cluster could not decode stream msg for "%s > %s" [%s]", mset.account(), mset.name(), node.Group()) } panic(err.Error()) } // Check for flowcontrol here. if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) { if !isRecovering { mset.sendFlowControlReply(reply) } continue } // Grab last sequence and CLFS. last, clfs := mset.lastSeqAndCLFS() // We can skip if we know this is less than what we already have. if lseq-clfs < last { s.Debugf("Apply stream entries for "%s > %s" skipping message with sequence %d with last of %d", mset.account(), mset.name(), lseq+1-clfs, last) continue } // Skip by hand here since first msg special case. // Reason is sequence is unsigned and for lseq being 0 // the lseq under stream would have to be -1. if lseq == 0 && last != 0 { continue } // Messages to be skipped have no subject or timestamp or msg or hdr. if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 { // Skip and update our lseq. mset.setLastSeq(mset.store.SkipMsg()) continue } // Process the actual message here. if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { // Only return in place if we are going to reset stream or we are out of space. if isClusterResetErr(err) || isOutOfSpaceErr(err) { return err } s.Debugf("Apply stream entries for "%s > %s" got error processing message: %v", mset.account(), mset.name(), err) } case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { if node := mset.raftNode(); node != nil { s := js.srv s.Errorf("JetStream cluster could not decode delete msg for "%s > %s" [%s]", mset.account(), mset.name(), node.Group()) } panic(err.Error()) } s, cc := js.server(), js.cluster var removed bool if md.NoErase { removed, err = mset.removeMsg(md.Seq) } else { removed, err = mset.eraseMsg(md.Seq) } // Cluster reset error. if err == ErrStoreEOF { return err } if err != nil && !isRecovering { s.Debugf("JetStream cluster failed to delete stream msg %d from "%s > %s": %v", md.Seq, md.Client.serviceAccount(), md.Stream, err) } js.mu.RLock() isLeader := cc.isStreamLeader(md.Client.serviceAccount(), md.Stream) js.mu.RUnlock() if isLeader && !isRecovering { var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} if err != nil { resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp)) } else if !removed { resp.Error = NewJSSequenceNotFoundError(md.Seq) s.sendAPIErrResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp)) } else { resp.Success = true s.sendAPIResponse(md.Client, mset.account(), md.Subject, md.Reply, _EMPTY_, s.jsonResponse(resp)) } } case purgeStreamOp: sp, err := decodeStreamPurge(buf[1:]) if err != nil { if node := mset.raftNode(); node != nil { s := js.srv s.Errorf("JetStream cluster could not decode purge msg for "%s > %s" [%s]", mset.account(), mset.name(), node.Group()) } panic(err.Error()) } // Ignore if we are recovering and we have already processed. if isRecovering { if mset.state().FirstSeq <= sp.LastSeq { // Make sure all messages from the purge are gone. mset.store.Compact(sp.LastSeq + 1) } continue } s := js.server() purged, err := mset.purge(sp.Request) if err != nil { s.Warnf("JetStream cluster failed to purge stream %q for account %q: %v", sp.Stream, sp.Client.serviceAccount(), err) } js.mu.RLock() isLeader := js.cluster.isStreamLeader(sp.Client.serviceAccount(), sp.Stream) js.mu.RUnlock() if isLeader && !isRecovering { var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} if err != nil { resp.Error = NewJSStreamGeneralError(err, Unless(err)) s.sendAPIErrResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp)) } else { resp.Purged = purged resp.Success = true s.sendAPIResponse(sp.Client, mset.account(), sp.Subject, sp.Reply, _EMPTY_, s.jsonResponse(resp)) } } default: panic("JetStream Cluster Unknown group entry op type!") } } else if e.Type == EntrySnapshot { if !isRecovering && mset != nil { var snap streamSnapshot if err := json.Unmarshal(e.Data, &snap); err != nil { return err } if !mset.IsLeader() { if err := mset.processSnapshot(&snap); err != nil { return err } } } else if isRecovering && mset != nil { // On recovery, reset CLFS/FAILED. var snap streamSnapshot if err := json.Unmarshal(e.Data, &snap); err != nil { return err } mset.mu.Lock() mset.clfs = snap.Failed mset.mu.Unlock() } } else if e.Type == EntryRemovePeer { js.mu.RLock() var ourID string if js.cluster != nil && js.cluster.meta != nil { ourID = js.cluster.meta.ID() } js.mu.RUnlock() // We only need to do processing if this is us. if peer := string(e.Data); peer == ourID && mset != nil { // Double check here with the registered stream assignment. shouldRemove := true if sa := mset.streamAssignment(); sa != nil && sa.Group != nil { js.mu.RLock() shouldRemove = !sa.Group.isMember(ourID) js.mu.RUnlock() } if shouldRemove { mset.stop(true, false) } } return nil } } return nil } // Returns the PeerInfo for all replicas of a raft node. This is different than node.Peers() // and is used for external facing advisories. func (s *Server) replicas(node RaftNode) []*PeerInfo { now := time.Now() var replicas []*PeerInfo for _, rp := range node.Peers() { if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) pi := &PeerInfo{Peer: rp.ID, Name: si.name, Current: rp.Current, Active: now.Sub(rp.Last), Offline: si.offline, Lag: rp.Lag} replicas = append(replicas, pi) } } return replicas } // Will check our node peers and see if we should remove a peer. func (js *jetStream) checkPeers(rg *raftGroup) { js.mu.Lock() defer js.mu.Unlock() // FIXME(dlc) - Single replicas? if rg == nil || rg.node == nil { return } for _, peer := range rg.node.Peers() { if !rg.isMember(peer.ID) { rg.node.ProposeRemovePeer(peer.ID) } } } // Process a leader change for the clustered stream. func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { if mset == nil { return } sa := mset.streamAssignment() if sa == nil { return } js.mu.Lock() s, account, err := js.srv, sa.Client.serviceAccount(), sa.err client, subject, reply := sa.Client, sa.Subject, sa.Reply hasResponded := sa.responded sa.responded = true peers := copyStrings(sa.Group.Peers) js.mu.Unlock() streamName := mset.name() if isLeader { s.Noticef("JetStream cluster new stream leader for "%s > %s"", account, streamName) s.sendStreamLeaderElectAdvisory(mset) // Check for peer removal and process here if needed. js.checkPeers(sa.Group) mset.checkAllowMsgCompress(peers) } else { // We are stepping down. // Make sure if we are doing so because we have lost quorum that we send the appropriate advisories. if node := mset.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > 5*time.Second { s.sendStreamLostQuorumAdvisory(mset) } } // Tell stream to switch leader status. mset.setLeader(isLeader) if !isLeader || hasResponded { return } acc, _ := s.LookupAccount(account) if acc == nil { return } // Send our response. var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} if err != nil { resp.Error = NewJSStreamCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), Config: mset.config(), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), } resp.DidCreate = true s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) if node := mset.raftNode(); node != nil { mset.sendCreateAdvisory() } } } // Fixed value ok for now. const lostQuorumAdvInterval = 10 * time.Second // Determines if we should send lost quorum advisory. We throttle these after first one. func (mset *stream) shouldSendLostQuorum() bool { mset.mu.Lock() defer mset.mu.Unlock() if time.Since(mset.lqsent) >= lostQuorumAdvInterval { mset.lqsent = time.Now() return true } return false } func (s *Server) sendStreamLostQuorumAdvisory(mset *stream) { if mset == nil { return } node, stream, acc := mset.raftNode(), mset.name(), mset.account() if node == nil { return } if !mset.shouldSendLostQuorum() { return } s.Warnf("JetStream cluster stream "%s > %s" has NO quorum, stalled", acc.GetName(), stream) subj := JSAdvisoryStreamQuorumLostPre + "." + stream adv := &JSStreamQuorumLostAdvisory{ TypedEvent: TypedEvent{ Type: JSStreamQuorumLostAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: stream, Replicas: s.replicas(node), Domain: s.getOpts().JetStreamDomain, } // Send to the user"s account if not the system account. if acc != s.SystemAccount() { s.publishAdvisory(acc, subj, adv) } // Now do system level one. Place account info in adv, and nil account means system. adv.Account = acc.GetName() s.publishAdvisory(nil, subj, adv) } func (s *Server) sendStreamLeaderElectAdvisory(mset *stream) { if mset == nil { return } node, stream, acc := mset.raftNode(), mset.name(), mset.account() if node == nil { return } subj := JSAdvisoryStreamLeaderElectedPre + "." + stream adv := &JSStreamLeaderElectedAdvisory{ TypedEvent: TypedEvent{ Type: JSStreamLeaderElectedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: stream, Leader: s.serverNameForNode(node.GroupLeader()), Replicas: s.replicas(node), Domain: s.getOpts().JetStreamDomain, } // Send to the user"s account if not the system account. if acc != s.SystemAccount() { s.publishAdvisory(acc, subj, adv) } // Now do system level one. Place account info in adv, and nil account means system. adv.Account = acc.GetName() s.publishAdvisory(nil, subj, adv) } // Will lookup a stream assignment. // Lock should be held. func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignment) { cc := js.cluster if cc == nil { return nil } if as := cc.streams[account]; as != nil { sa = as[stream] } return sa } // processStreamAssignment is called when followers have replicated an assignment. func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { js.mu.Lock() s, cc := js.srv, js.cluster accName, stream := sa.Client.serviceAccount(), sa.Config.Name noMeta := cc == nil || cc.meta == nil var ourID string if !noMeta { ourID = cc.meta.ID() } var isMember bool if sa.Group != nil && ourID != _EMPTY_ { isMember = sa.Group.isMember(ourID) } // Remove this stream from the inflight proposals cc.removeInflightProposal(accName, sa.Config.Name) if s == nil || noMeta { js.mu.Unlock() return false } accStreams := cc.streams[accName] if accStreams == nil { accStreams = make(map[string]*streamAssignment) } else if osa := accStreams[stream]; osa != nil && osa != sa { // Copy over private existing state from former SA. sa.Group.node = osa.Group.node sa.consumers = osa.consumers sa.responded = osa.responded sa.err = osa.err } // Update our state. accStreams[stream] = sa cc.streams[accName] = accStreams hasResponded := sa.responded js.mu.Unlock() acc, err := s.LookupAccount(accName) if err != nil { ll := fmt.Sprintf("Account [%s] lookup for stream create failed: %v", accName, err) if isMember { if !hasResponded { // If we can not lookup the account and we are a member, send this result back to the metacontroller leader. result := &streamAssignmentResult{ Account: accName, Stream: stream, Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}, } result.Response.Error = NewJSNoAccountError() s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result) } s.Warnf(ll) } else { s.Debugf(ll) } return false } var didRemove bool // Check if this is for us.. if isMember { js.processClusterCreateStream(acc, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. s.removeStream(ourID, mset, sa) } // If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected. if sa.Sync == _EMPTY_ { js.mu.Lock() cc.streamsCheck = true js.mu.Unlock() return false } return didRemove } // processUpdateStreamAssignment is called when followers have replicated an updated assignment. func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) { js.mu.RLock() s, cc := js.srv, js.cluster js.mu.RUnlock() if s == nil || cc == nil { // TODO(dlc) - debug at least return } accName := sa.Client.serviceAccount() stream := sa.Config.Name js.mu.Lock() if cc.meta == nil { js.mu.Unlock() return } ourID := cc.meta.ID() var isMember bool if sa.Group != nil { isMember = sa.Group.isMember(ourID) } accStreams := cc.streams[accName] if accStreams == nil { js.mu.Unlock() return } osa := accStreams[stream] if osa == nil { js.mu.Unlock() return } // Copy over private existing state from former SA. sa.Group.node = osa.Group.node sa.consumers = osa.consumers sa.err = osa.err // If we detect we are scaling down to 1, non-clustered, and we had a previous node, clear it here. if sa.Config.Replicas == 1 && sa.Group.node != nil { sa.Group.node = nil } // Update our state. accStreams[stream] = sa cc.streams[accName] = accStreams // Make sure we respond if we are a member. if isMember { sa.responded = false } else { // Make sure to clean up any old node in case this stream moves back here. sa.Group.node = nil } js.mu.Unlock() acc, err := s.LookupAccount(accName) if err != nil { s.Warnf("Update Stream Account %s, error on lookup: %v", accName, err) return } // Check if this is for us.. if isMember { js.processClusterUpdateStream(acc, osa, sa) } else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // We have one here even though we are not a member. This can happen on re-assignment. s.removeStream(ourID, mset, sa) } } // Common function to remove ourself from this server. // This can happen on re-assignment, move, etc func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) { if mset == nil { return } // Make sure to use the new stream assignment, not our own. s.Debugf("JetStream removing stream "%s > %s" from this server", nsa.Client.serviceAccount(), nsa.Config.Name) if node := mset.raftNode(); node != nil { if node.Leader() { node.StepDown(nsa.Group.Preferred) } node.ProposeRemovePeer(ourID) // shut down monitor by shutting down raft node.Delete() } // Make sure this node is no longer attached to our stream assignment. if js, _ := s.getJetStreamCluster(); js != nil { js.mu.Lock() nsa.Group.node = nil js.mu.Unlock() } // wait for monitor to be shut down mset.monitorWg.Wait() mset.stop(true, false) } // processClusterUpdateStream is called when we have a stream assignment that // has been updated for an existing assignment and we are a member. func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAssignment) { if sa == nil { return } js.mu.Lock() s, rg := js.srv, sa.Group client, subject, reply := sa.Client, sa.Subject, sa.Reply alreadyRunning, numReplicas := osa.Group.node != nil, len(rg.Peers) needsNode := rg.node == nil storage, cfg := sa.Config.Storage, sa.Config hasResponded := sa.responded sa.responded = true recovering := sa.recovering js.mu.Unlock() mset, err := acc.lookupStream(cfg.Name) if err == nil && mset != nil { // Make sure we have not had a new group assigned to us. if osa.Group.Name != sa.Group.Name { s.Warnf("JetStream cluster detected stream remapping for "%s > %s" from %q to %q", acc, cfg.Name, osa.Group.Name, sa.Group.Name) mset.removeNode() alreadyRunning, needsNode = false, true // Make sure to clear from original. js.mu.Lock() osa.Group.node = nil js.mu.Unlock() } var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { mset.setLeader(false) js.createRaftGroup(acc.GetName(), rg, storage) } mset.monitorWg.Add(1) // Start monitoring.. s.startGoRoutine(func() { js.monitorStream(mset, sa, needsNode) }) } else if numReplicas == 1 && alreadyRunning { // We downgraded to R1. Make sure we cleanup the raft node and the stream monitor. mset.removeNode() // Make sure we are leader now that we are R1. needsSetLeader = true // In case we need to shutdown the cluster specific subs, etc. mset.setLeader(false) js.mu.Lock() rg.node = nil js.mu.Unlock() } // Call update. if err = mset.updateWithAdvisory(cfg, !recovering); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err) } // Set the new stream assignment. mset.setStreamAssignment(sa) // Make sure we are the leader now that we are R1. if needsSetLeader { mset.setLeader(true) } } // If not found we must be expanding into this node since if we are here we know we are a member. if err == ErrJetStreamStreamNotFound { js.processStreamAssignment(sa) return } if err != nil { js.mu.Lock() sa.err = err result := &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}, Update: true, } result.Response.Error = NewJSStreamGeneralError(err, Unless(err)) js.mu.Unlock() // Send response to the metadata leader. They will forward to the user as needed. s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result) return } isLeader := mset.IsLeader() // Check for missing syncSubject bug. if isLeader && osa != nil && osa.Sync == _EMPTY_ { if node := mset.raftNode(); node != nil { node.StepDown() } return } // If we were a single node being promoted assume leadership role for purpose of responding. if !hasResponded && !isLeader && !alreadyRunning { isLeader = true } // Check if we should bail. if !isLeader || hasResponded || recovering { return } // Send our response. var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), Config: mset.config(), Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } // processClusterCreateStream is called when we have a stream assignment that // has been committed and this server is a member of the peer group. func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignment) { if sa == nil { return } js.mu.RLock() s, rg := js.srv, sa.Group alreadyRunning := rg.node != nil storage := sa.Config.Storage js.mu.RUnlock() // Process the raft group and make sure it"s running if needed. err := js.createRaftGroup(acc.GetName(), rg, storage) // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. shouldCreate := true if sa.Restore != nil { if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred { shouldCreate = false } else { sa.Restore = nil } } // Our stream. var mset *stream // Process here if not restoring or not the leader. if shouldCreate && err == nil { // Go ahead and create or update the stream. mset, err = acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { osa := mset.streamAssignment() // If we already have a stream assignment and they are the same exact config, short circuit here. if osa != nil { if reflect.DeepEqual(osa.Config, sa.Config) { if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) { // Since this already exists we know it succeeded, just respond to this caller. js.mu.RLock() client, subject, reply, recovering := sa.Client, sa.Subject, sa.Reply, sa.recovering js.mu.RUnlock() if !recovering { var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), Config: mset.config(), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), } s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } return } else { // We had a bug where we could have multiple assignments for the same // stream but with different group assignments, including multiple raft // groups. So check for that here. We can only bet on the last one being // consistent in the long run, so let it continue if we see this condition. s.Warnf("JetStream cluster detected duplicate assignment for stream %q for account %q", sa.Config.Name, acc.Name) if osa.Group.node != nil && osa.Group.node != sa.Group.node { osa.Group.node.Delete() osa.Group.node = nil } } } } mset.setStreamAssignment(sa) if err = mset.updateWithAdvisory(sa.Config, false); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) if osa != nil { // Process the raft group and make sure it"s running if needed. js.createRaftGroup(acc.GetName(), osa.Group, storage) mset.setStreamAssignment(osa) } if rg.node != nil { rg.node.Delete() rg.node = nil } } } else if err == NewJSStreamNotFoundError() { // Add in the stream here. mset, err = acc.addStreamWithAssignment(sa.Config, nil, sa) } if mset != nil { mset.setCreatedTime(sa.Created) } } // This is an error condition. if err != nil { if IsNatsErr(err, JSStreamStoreFailedF) { s.Warnf("Stream create failed for "%s > %s": %v", sa.Client.serviceAccount(), sa.Config.Name, err) err = errStreamStoreFailed } js.mu.Lock() sa.err = err hasResponded := sa.responded // If out of space do nothing for now. if isOutOfSpaceErr(err) { hasResponded = true } if rg.node != nil { rg.node.Delete() } var result *streamAssignmentResult if !hasResponded { result = &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Response: &JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}, } result.Response.Error = NewJSStreamCreateError(err, Unless(err)) } js.mu.Unlock() // Send response to the metadata leader. They will forward to the user as needed. if result != nil { s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, result) } return } // Start our monitoring routine. if rg.node != nil { if !alreadyRunning { if mset != nil { mset.monitorWg.Add(1) } s.startGoRoutine(func() { js.monitorStream(mset, sa, false) }) } } else { // Single replica stream, process manually here. // If we are restoring, process that first. if sa.Restore != nil { // We are restoring a stream here. restoreDoneCh := s.processStreamRestore(sa.Client, acc, sa.Config, _EMPTY_, sa.Reply, _EMPTY_) s.startGoRoutine(func() { defer s.grWG.Done() select { case err := <-restoreDoneCh: if err == nil { mset, err = acc.lookupStream(sa.Config.Name) if mset != nil { mset.setStreamAssignment(sa) mset.setCreatedTime(sa.Created) } } if err != nil { if mset != nil { mset.delete() } js.mu.Lock() sa.err = err result := &streamAssignmentResult{ Account: sa.Client.serviceAccount(), Stream: sa.Config.Name, Restore: &JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}, } result.Restore.Error = NewJSStreamRestoreError(err, Unless(err)) js.mu.Unlock() // Send response to the metadata leader. They will forward to the user as needed. b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines. s.sendInternalMsgLocked(streamAssignmentSubj, _EMPTY_, nil, b) return } js.processStreamLeaderChange(mset, true) // Check to see if we have restored consumers here. // These are not currently assigned so we will need to do so here. if consumers := mset.getPublicConsumers(); len(consumers) > 0 { js.mu.RLock() cc := js.cluster js.mu.RUnlock() for _, o := range consumers { name, cfg := o.String(), o.config() rg := cc.createGroupForConsumer(&cfg, sa) // Place our initial state here as well for assignment distribution. ca := &consumerAssignment{ Group: rg, Stream: sa.Config.Name, Name: name, Config: &cfg, Client: sa.Client, Created: o.createdTime(), } addEntry := encodeAddConsumerAssignment(ca) cc.meta.ForwardProposal(addEntry) // Check to make sure we see the assignment. go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { js.mu.RLock() ca, meta := js.consumerAssignment(ca.Client.serviceAccount(), sa.Config.Name, name), cc.meta js.mu.RUnlock() if ca == nil { s.Warnf("Consumer assignment has not been assigned, retrying") if meta != nil { meta.ForwardProposal(addEntry) } else { return } } else { return } } }() } } case <-s.quitCh: return } }) } else { js.processStreamLeaderChange(mset, true) } } } // processStreamRemoval is called when followers have replicated an assignment. func (js *jetStream) processStreamRemoval(sa *streamAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster if s == nil || cc == nil || cc.meta == nil { // TODO(dlc) - debug at least js.mu.Unlock() return } stream := sa.Config.Name isMember := sa.Group.isMember(cc.meta.ID()) wasLeader := cc.isStreamLeader(sa.Client.serviceAccount(), stream) // Check if we already have this assigned. accStreams := cc.streams[sa.Client.serviceAccount()] needDelete := accStreams != nil && accStreams[stream] != nil if needDelete { delete(accStreams, stream) if len(accStreams) == 0 { delete(cc.streams, sa.Client.serviceAccount()) } } js.mu.Unlock() if needDelete { js.processClusterDeleteStream(sa, isMember, wasLeader) } } func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, wasLeader bool) { if sa == nil { return } js.mu.RLock() s := js.srv node := sa.Group.node hadLeader := node == nil || node.GroupLeader() != noLeader offline := s.allPeersOffline(sa.Group) var isMetaLeader bool if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } recovering := sa.recovering js.mu.RUnlock() stopped := false var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} var err error var acc *Account // Go ahead and delete the stream if we have it and the account here. if acc, _ = s.LookupAccount(sa.Client.serviceAccount()); acc != nil { if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { // shut down monitor by shutting down raft if n := mset.raftNode(); n != nil { n.Delete() } // wait for monitor to be shut down mset.monitorWg.Wait() err = mset.stop(true, wasLeader) stopped = true } } // Always delete the node if present. if node != nil { node.Delete() } // This is a stop gap cleanup in case // 1) the account does not exist (and mset couldn"t be stopped) and/or // 2) node was nil (and couldn"t be deleted) if !stopped || node == nil { if sacc := s.SystemAccount(); sacc != nil { os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, sa.Group.Name)) // cleanup dependent consumer groups if !stopped { for _, ca := range sa.consumers { os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) } } } } accDir := filepath.Join(js.config.StoreDir, sa.Client.serviceAccount()) streamDir := filepath.Join(accDir, streamsDir) os.RemoveAll(filepath.Join(streamDir, sa.Config.Name)) // no op if not empty os.Remove(streamDir) os.Remove(accDir) // Normally we want only the leader to respond here, but if we had no leader then all members will respond to make // sure we get feedback to the user. if !isMember || (hadLeader && !wasLeader) { // If all the peers are offline and we are the meta leader we will also respond, so suppress returning here. if !(offline && isMetaLeader) { return } } // Do not respond if the account does not exist any longer if acc == nil || recovering { return } if err != nil { resp.Error = NewJSStreamGeneralError(err, Unless(err)) s.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, s.jsonResponse(resp)) } else { resp.Success = true s.sendAPIResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, s.jsonResponse(resp)) } } // processConsumerAssignment is called when followers have replicated an assignment for a consumer. func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { js.mu.RLock() s, cc := js.srv, js.cluster accName, stream, consumerName := ca.Client.serviceAccount(), ca.Stream, ca.Name noMeta := cc == nil || cc.meta == nil var ourID string if !noMeta { ourID = cc.meta.ID() } var isMember bool if ca.Group != nil && ourID != _EMPTY_ { isMember = ca.Group.isMember(ourID) } js.mu.RUnlock() if s == nil || noMeta { return } sa := js.streamAssignment(accName, stream) if sa == nil { s.Debugf("Consumer create failed, could not locate stream "%s > %s"", accName, stream) return } // Might need this below. numReplicas := sa.Config.Replicas // Track if this existed already. var wasExisting bool // Check if we have an existing consumer assignment. js.mu.Lock() if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) } else if oca := sa.consumers[ca.Name]; oca != nil { wasExisting = true // Copy over private existing state from former SA. ca.Group.node = oca.Group.node ca.responded = oca.responded ca.err = oca.err } // Capture the optional state. We will pass it along if we are a member to apply. // This is only applicable when restoring a stream with consumers. state := ca.State ca.State = nil // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca js.mu.Unlock() acc, err := s.LookupAccount(accName) if err != nil { ll := fmt.Sprintf("Account [%s] lookup for consumer create failed: %v", accName, err) if isMember { if !js.isMetaRecovering() { // If we can not lookup the account and we are a member, send this result back to the metacontroller leader. result := &consumerAssignmentResult{ Account: accName, Stream: stream, Consumer: consumerName, Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = NewJSNoAccountError() s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) } s.Warnf(ll) } else { s.Debugf(ll) } return } // Check if this is for us.. if isMember { js.processClusterCreateConsumer(ca, state, wasExisting) } else { // We need to be removed here, we are no longer assigned. // Grab consumer if we have it. var o *consumer if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil { o = mset.lookupConsumer(ca.Name) } // Check if we have a raft node running, meaning we are no longer part of the group but were. js.mu.Lock() if node := ca.Group.node; node != nil { // We have one here even though we are not a member. This can happen on re-assignment. s.Debugf("JetStream removing consumer "%s > %s > %s" from this server", sa.Client.serviceAccount(), sa.Config.Name, ca.Name) if node.Leader() { s.Debugf("JetStream consumer "%s > %s > %s" is being removed and was the leader, will perform stepdown", sa.Client.serviceAccount(), sa.Config.Name, ca.Name) peers, cn := node.Peers(), s.cachedClusterName() migrating := numReplicas != len(peers) // Select a new peer to transfer to. If we are a migrating make sure its from the new cluster. var npeer string for _, r := range peers { if !r.Current { continue } if !migrating { npeer = r.ID break } else if sir, ok := s.nodeToInfo.Load(r.ID); ok && sir != nil { si := sir.(nodeInfo) if si.cluster != cn { npeer = r.ID break } } } // Clear the raftnode from our consumer so that a subsequent o.delete will not also issue a stepdown. if o != nil { o.clearRaftNode() } // Manually handle the stepdown and deletion of the node. node.UpdateKnownPeers(ca.Group.Peers) node.StepDown(npeer) node.Delete() } else { node.UpdateKnownPeers(ca.Group.Peers) } } // Always clear the old node. ca.Group.node = nil ca.err = nil js.mu.Unlock() if o != nil { o.deleteWithoutAdvisory() } } } func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) { js.mu.Lock() s, cc := js.srv, js.cluster if s == nil || cc == nil || cc.meta == nil { // TODO(dlc) - debug at least js.mu.Unlock() return } isMember := ca.Group.isMember(cc.meta.ID()) wasLeader := cc.isConsumerLeader(ca.Client.serviceAccount(), ca.Stream, ca.Name) // Delete from our state. var needDelete bool if accStreams := cc.streams[ca.Client.serviceAccount()]; accStreams != nil { if sa := accStreams[ca.Stream]; sa != nil && sa.consumers != nil && sa.consumers[ca.Name] != nil { needDelete = true delete(sa.consumers, ca.Name) } } js.mu.Unlock() if needDelete { js.processClusterDeleteConsumer(ca, isMember, wasLeader) } } type consumerAssignmentResult struct { Account string `json:"account"` Stream string `json:"stream"` Consumer string `json:"consumer"` Response *JSApiConsumerCreateResponse `json:"response,omitempty"` } // processClusterCreateConsumer is when we are a member of the group and need to create the consumer. func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state *ConsumerState, wasExisting bool) { if ca == nil { return } js.mu.RLock() s := js.srv rg := ca.Group alreadyRunning := rg != nil && rg.node != nil accName, stream, consumer := ca.Client.serviceAccount(), ca.Stream, ca.Name js.mu.RUnlock() acc, err := s.LookupAccount(accName) if err != nil { s.Warnf("JetStream cluster failed to lookup axccount %q: %v", accName, err) return } // Go ahead and create or update the consumer. mset, err := acc.lookupStream(stream) if err != nil { if !js.isMetaRecovering() { js.mu.Lock() s.Warnf("Consumer create failed, could not locate stream "%s > %s > %s"", ca.Client.serviceAccount(), ca.Stream, ca.Name) ca.err = NewJSStreamNotFoundError() result := &consumerAssignmentResult{ Account: ca.Client.serviceAccount(), Stream: ca.Stream, Consumer: ca.Name, Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = NewJSStreamNotFoundError() s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) js.mu.Unlock() } return } // Check if we already have this consumer running. o := mset.lookupConsumer(consumer) if !alreadyRunning { // Process the raft group and make sure its running if needed. storage := mset.config().Storage if ca.Config.MemoryStorage { storage = MemoryStorage } js.createRaftGroup(accName, rg, storage) } else { // If we are clustered update the known peers. js.mu.RLock() if node := rg.node; node != nil { node.UpdateKnownPeers(ca.Group.Peers) } js.mu.RUnlock() } // Check if we already have this consumer running. var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { didCreate = true } } else { // This consumer exists. // Only update if config is really different. cfg := o.config() if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate { // Call into update, ignore consumer exists error here since this means an old deliver subject is bound // which can happen on restart etc. if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() { // This is essentially an update that has failed. Respond back to metaleader if we are not recovering. js.mu.RLock() if !js.metaRecovering { result := &consumerAssignmentResult{ Account: accName, Stream: stream, Consumer: consumer, Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = NewJSConsumerNameExistError() s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, result) } s.Warnf("Consumer create failed during update for "%s > %s > %s": %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err) js.mu.RUnlock() return } } var sendState bool js.mu.RLock() n := rg.node // Check if we already had a consumer assignment and its still pending. cca, oca := ca, o.consumerAssignment() if oca != nil { if !oca.responded { // We can"t override info for replying here otherwise leader once elected can not respond. // So copy over original client and the reply from the old ca. cac := *ca cac.Client = oca.Client cac.Reply = oca.Reply cca = &cac needsLocalResponse = true } // If we look like we are scaling up, let"s send our current state to the group. sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil // Signal that this is an update if ca.Reply != _EMPTY_ { isConfigUpdate = true } } js.mu.RUnlock() if sendState { if snap, err := o.store.EncodedState(); err == nil { n.SendSnapshot(snap) } } // Set CA for our consumer. o.setConsumerAssignment(cca) s.Debugf("JetStream cluster, consumer was already running") } // If we have an initial state set apply that now. if state != nil && o != nil { o.mu.Lock() err = o.setStoreState(state) o.mu.Unlock() } if err != nil { if IsNatsErr(err, JSConsumerStoreFailedErrF) { s.Warnf("Consumer create failed for "%s > %s > %s": %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err) err = errConsumerStoreFailed } js.mu.Lock() ca.err = err hasResponded := ca.responded // If out of space do nothing for now. if isOutOfSpaceErr(err) { hasResponded = true } if rg.node != nil { rg.node.Delete() } var result *consumerAssignmentResult if !hasResponded && !js.metaRecovering { result = &consumerAssignmentResult{ Account: ca.Client.serviceAccount(), Stream: ca.Stream, Consumer: ca.Name, Response: &JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}, } result.Response.Error = NewJSConsumerCreateError(err, Unless(err)) } else if err == errNoInterest { // This is a stranded ephemeral, let"s clean this one up. subject := fmt.Sprintf(JSApiConsumerDeleteT, ca.Stream, ca.Name) mset.outq.send(newJSPubMsg(subject, _EMPTY_, _EMPTY_, nil, nil, nil, 0)) } js.mu.Unlock() if result != nil { // Send response to the metadata leader. They will forward to the user as needed. b, _ := json.Marshal(result) // Avoids auto-processing and doing fancy json with newlines. s.sendInternalMsgLocked(consumerAssignmentSubj, _EMPTY_, nil, b) } } else { if didCreate { o.setCreatedTime(ca.Created) } else { // Check for scale down to 1.. if rg.node != nil && len(rg.Peers) == 1 { o.clearNode() o.setLeader(true) // Need to clear from rg too. js.mu.Lock() rg.node = nil client, subject, reply := ca.Client, ca.Subject, ca.Reply js.mu.Unlock() var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} resp.ConsumerInfo = o.info() s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) return } } if rg.node == nil { // Single replica consumer, process manually here. js.mu.Lock() // Force response in case we think this is an update. if !js.metaRecovering && isConfigUpdate { ca.responded = false } js.mu.Unlock() js.processConsumerLeaderChange(o, true) } else { // Clustered consumer. // Start our monitoring routine if needed. if !alreadyRunning && !o.isMonitorRunning() { s.startGoRoutine(func() { js.monitorConsumer(o, ca) }) } // For existing consumer, only send response if not recovering. if wasExisting && !js.isMetaRecovering() { if o.IsLeader() || (!didCreate && needsLocalResponse) { // Process if existing as an update. Double check that this is not recovered. js.mu.RLock() client, subject, reply, recovering := ca.Client, ca.Subject, ca.Reply, ca.recovering js.mu.RUnlock() if !recovering { var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} resp.ConsumerInfo = o.info() s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } } } } } } func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMember, wasLeader bool) { if ca == nil { return } js.mu.RLock() s := js.srv node := ca.Group.node offline := s.allPeersOffline(ca.Group) var isMetaLeader bool if cc := js.cluster; cc != nil { isMetaLeader = cc.isLeader() } recovering := ca.recovering js.mu.RUnlock() stopped := false var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} var err error var acc *Account // Go ahead and delete the consumer if we have it and the account. if acc, _ = s.LookupAccount(ca.Client.serviceAccount()); acc != nil { if mset, _ := acc.lookupStream(ca.Stream); mset != nil { if o := mset.lookupConsumer(ca.Name); o != nil { err = o.stopWithFlags(true, false, true, wasLeader) stopped = true } } } // Always delete the node if present. if node != nil { node.Delete() } // This is a stop gap cleanup in case // 1) the account does not exist (and mset consumer couldn"t be stopped) and/or // 2) node was nil (and couldn"t be deleted) if !stopped || node == nil { if sacc := s.SystemAccount(); sacc != nil { os.RemoveAll(filepath.Join(js.config.StoreDir, sacc.GetName(), defaultStoreDirName, ca.Group.Name)) } } if !wasLeader || ca.Reply == _EMPTY_ { if !(offline && isMetaLeader) { return } } // Do not respond if the account does not exist any longer or this is during recovery. if acc == nil || recovering { return } if err != nil { resp.Error = NewJSStreamNotFoundError(Unless(err)) s.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(resp)) } else { resp.Success = true s.sendAPIResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(resp)) } } // Returns the consumer assignment, or nil if not present. // Lock should be held. func (js *jetStream) consumerAssignment(account, stream, consumer string) *consumerAssignment { if sa := js.streamAssignment(account, stream); sa != nil { return sa.consumers[consumer] } return nil } // consumerAssigned informs us if this server has this consumer assigned. func (jsa *jsAccount) consumerAssigned(stream, consumer string) bool { jsa.mu.RLock() js, acc := jsa.js, jsa.account jsa.mu.RUnlock() if js == nil { return false } js.mu.RLock() defer js.mu.RUnlock() return js.cluster.isConsumerAssigned(acc, stream, consumer) } // Read lock should be held. func (cc *jetStreamCluster) isConsumerAssigned(a *Account, stream, consumer string) bool { // Non-clustered mode always return true. if cc == nil { return true } if cc.meta == nil { return false } var sa *streamAssignment accStreams := cc.streams[a.Name] if accStreams != nil { sa = accStreams[stream] } if sa == nil { // TODO(dlc) - This should not happen. return false } ca := sa.consumers[consumer] if ca == nil { return false } rg := ca.Group // Check if we are the leader of this raftGroup assigned to the stream. ourID := cc.meta.ID() for _, peer := range rg.Peers { if peer == ourID { return true } } return false } // Returns our stream and underlying raft node. func (o *consumer) streamAndNode() (*stream, RaftNode) { if o == nil { return nil, nil } o.mu.RLock() defer o.mu.RUnlock() return o.mset, o.node } // Return the replica count for this consumer. If the consumer has been // stopped, this will return an error. func (o *consumer) replica() (int, error) { o.mu.RLock() oCfg := o.cfg mset := o.mset o.mu.RUnlock() if mset == nil { return 0, errBadConsumer } sCfg := mset.config() return oCfg.replicas(&sCfg), nil } func (o *consumer) raftGroup() *raftGroup { if o == nil { return nil } o.mu.RLock() defer o.mu.RUnlock() if o.ca == nil { return nil } return o.ca.Group } func (o *consumer) clearRaftNode() { if o == nil { return } o.mu.Lock() defer o.mu.Unlock() o.node = nil } func (o *consumer) raftNode() RaftNode { if o == nil { return nil } o.mu.RLock() defer o.mu.RUnlock() return o.node } func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { s, n, cc := js.server(), o.raftNode(), js.cluster defer s.grWG.Done() if n == nil { s.Warnf("No RAFT group for "%s > %s > %s"", o.acc.Name, ca.Stream, ca.Name) return } // Make sure only one is running. if o.checkInMonitor() { return } defer o.clearMonitorRunning() qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID() s.Debugf("Starting consumer monitor for "%s > %s > %s" [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) defer s.Debugf("Exiting consumer monitor for "%s > %s > %s" [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group()) // Make sure to stop the raft group on exit to prevent accidental memory bloat. defer n.Stop() const ( compactInterval = 2 * time.Minute compactSizeMin = 64 * 1024 // What is stored here is always small for consumers. compactNumMin = 1024 minSnapDelta = 10 * time.Second ) // Spread these out for large numbers on server restart. rci := time.Duration(rand.Int63n(int64(time.Minute))) t := time.NewTicker(compactInterval + rci) defer t.Stop() // Highwayhash key for generating hashes. key := make([]byte, 32) rand.Read(key) // Hash of the last snapshot (fixed size in memory). var lastSnap []byte var lastSnapTime time.Time doSnapshot := func() { // Bail if trying too fast and not in a forced situation. if time.Since(lastSnapTime) < minSnapDelta { return } // Check several things to see if we need a snapshot. ne, nb := n.Size() if !n.NeedSnapshot() { // Check if we should compact etc. based on size of log. if ne < compactNumMin && nb < compactSizeMin { return } } if snap, err := o.store.EncodedState(); err == nil { hash := highwayhash.Sum(snap, key) // If the state hasn"t changed but the log has gone way over // the compaction size then we will want to compact anyway. // This can happen for example when a pull consumer fetches a // lot on an idle stream, log entries get distributed but the // state never changes, therefore the log never gets compacted. if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else if err != errNoSnapAvailable && err != errNodeClosed { s.Warnf("Failed to install snapshot for "%s > %s > %s" [%s]: %v", o.acc.Name, ca.Stream, ca.Name, n.Group(), err) } } } } // For migration tracking. var mmt *time.Ticker var mmtc <-chan time.Time startMigrationMonitoring := func() { if mmt == nil { mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C } } stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() mmt, mmtc = nil, nil } } defer stopMigrationMonitoring() // Track if we are leader. var isLeader bool recovering := true for { select { case <-s.quitCh: return case <-qch: return case <-aq.ch: ces := aq.pop() for _, ce := range ces { // No special processing needed for when we are caught up on restart. if ce == nil { recovering = false if n.NeedSnapshot() { doSnapshot() } continue } if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { ne, nb := n.Applied(ce.Index) // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot() } } else { s.Warnf("Error applying consumer entries to "%s > %s"", ca.Client.serviceAccount(), ca.Name) } } aq.recycle(&ces) case isLeader = <-lch: if recovering && !isLeader { js.setConsumerAssignmentRecovering(ca) } if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { doSnapshot() } // We may receive a leader change after the consumer assignment which would cancel us // monitoring for this closely. So re-assess our state here as well. // Or the old leader is no longer part of the set and transferred leadership // for this leader to resume with removal rg := o.raftGroup() // Check for migrations (peer count and replica count differ) here. // We set the state on the stream assignment update below. replicas, err := o.replica() if err != nil { continue } if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() } case <-uch: // keep consumer assignment current ca = o.consumerAssignment() // We get this when we have a new consumer assignment caused by an update. // We want to know if we are migrating. rg := o.raftGroup() // keep peer list up to date with config js.checkPeers(rg) // If we are migrating, monitor for the new peers to be caught up. replicas, err := o.replica() if err != nil { continue } if isLeader && len(rg.Peers) != replicas { startMigrationMonitoring() } else { stopMigrationMonitoring() } case <-mmtc: if !isLeader { // We are no longer leader, so not our job. stopMigrationMonitoring() continue } rg := o.raftGroup() ci := js.clusterInfo(rg) replicas, err := o.replica() if err != nil { continue } if len(rg.Peers) <= replicas { // Migration no longer happening, so not our job anymore stopMigrationMonitoring() continue } newPeers, oldPeers, newPeerSet, _ := genPeerInfo(rg.Peers, len(rg.Peers)-replicas) // If we are part of the new peerset and we have been passed the baton. // We will handle scale down. if newPeerSet[ourPeerId] { for _, p := range oldPeers { n.ProposeRemovePeer(p) } cca := ca.copyGroup() cca.Group.Peers = newPeers cca.Group.Cluster = s.cachedClusterName() cc.meta.ForwardProposal(encodeAddConsumerAssignment(cca)) s.Noticef("Scaling down "%s > %s > %s" to %+v", ca.Client.serviceAccount(), ca.Stream, ca.Name, s.peerSetToNames(newPeers)) } else { var newLeaderPeer, newLeader, newCluster string neededCurrent, current := replicas/2+1, 0 for _, r := range ci.Replicas { if r.Current && newPeerSet[r.Peer] { current++ if newCluster == _EMPTY_ { newLeaderPeer, newLeader, newCluster = r.Peer, r.Name, r.cluster } } } // Check if we have a quorom if current >= neededCurrent { s.Noticef("Transfer of consumer leader for "%s > %s > %s" to "%s"", ca.Client.serviceAccount(), ca.Stream, ca.Name, newLeader) n.StepDown(newLeaderPeer) } } case <-t.C: doSnapshot() } } } func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error { for _, e := range ce.Entries { if e.Type == EntrySnapshot { // No-op needed? state, err := decodeConsumerState(e.Data) if err != nil { if mset, node := o.streamAndNode(); mset != nil && node != nil { s := js.srv s.Errorf("JetStream cluster could not decode consumer snapshot for "%s > %s > %s" [%s]", mset.account(), mset.name(), o, node.Group()) } panic(err.Error()) } if err = o.store.Update(state); err != nil { o.mu.RLock() s, acc, mset, name := o.srv, o.acc, o.mset, o.name o.mu.RUnlock() if s != nil && mset != nil { s.Warnf("Consumer "%s > %s > %s" error on store update from snapshot entry: %v", acc, mset.name(), name, err) } } else if state, err := o.store.State(); err == nil { // See if we need to process this update if our parent stream is not a limits policy stream. o.mu.RLock() mset := o.mset shouldProcessAcks := mset != nil && o.retention != LimitsPolicy o.mu.RUnlock() // We should make sure to update the acks. if shouldProcessAcks { var ss StreamState mset.store.FastState(&ss) for seq := ss.FirstSeq; seq <= state.AckFloor.Stream; seq++ { mset.ackMsg(o, seq) } } } } else if e.Type == EntryRemovePeer { js.mu.RLock() var ourID string if js.cluster != nil && js.cluster.meta != nil { ourID = js.cluster.meta.ID() } js.mu.RUnlock() if peer := string(e.Data); peer == ourID { shouldRemove := true if mset := o.getStream(); mset != nil { if sa := mset.streamAssignment(); sa != nil && sa.Group != nil { js.mu.RLock() shouldRemove = !sa.Group.isMember(ourID) js.mu.RUnlock() } } if shouldRemove { o.stopWithFlags(true, false, false, false) } } return nil } else if e.Type == EntryAddPeer { // Ignore for now. } else { buf := e.Data switch entryOp(buf[0]) { case updateDeliveredOp: // These are handled in place in leaders. if !isLeader { dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) if err != nil { if mset, node := o.streamAndNode(); mset != nil && node != nil { s := js.srv s.Errorf("JetStream cluster could not decode consumer delivered update for "%s > %s > %s" [%s]", mset.account(), mset.name(), o, node.Group()) } panic(err.Error()) } // Make sure to update delivered under the lock. o.mu.Lock() err = o.store.UpdateDelivered(dseq, sseq, dc, ts) o.ldt = time.Now() o.mu.Unlock() if err != nil { panic(err.Error()) } } case updateAcksOp: dseq, sseq, err := decodeAckUpdate(buf[1:]) if err != nil { if mset, node := o.streamAndNode(); mset != nil && node != nil { s := js.srv s.Errorf("JetStream cluster could not decode consumer ack update for "%s > %s > %s" [%s]", mset.account(), mset.name(), o, node.Group()) } panic(err.Error()) } o.processReplicatedAck(dseq, sseq) case updateSkipOp: o.mu.Lock() if !o.isLeader() { var le = binary.LittleEndian if sseq := le.Uint64(buf[1:]); sseq > o.sseq { o.sseq = sseq } } o.mu.Unlock() case addPendingRequest: o.mu.Lock() if !o.isLeader() { if o.prm == nil { o.prm = make(map[string]struct{}) } o.prm[string(buf[1:])] = struct{}{} } o.mu.Unlock() case removePendingRequest: o.mu.Lock() if !o.isLeader() { if o.prm != nil { delete(o.prm, string(buf[1:])) } } o.mu.Unlock() default: panic(fmt.Sprintf("JetStream Cluster Unknown group entry op type! %v", entryOp(buf[0]))) } } } return nil } func (o *consumer) processReplicatedAck(dseq, sseq uint64) { o.mu.Lock() // Update activity. o.lat = time.Now() // Do actual ack update to store. o.store.UpdateAcks(dseq, sseq) mset := o.mset if mset == nil || o.retention == LimitsPolicy { o.mu.Unlock() return } var sagap uint64 if o.cfg.AckPolicy == AckAll { if o.isLeader() { sagap = sseq - o.asflr } else { // We are a follower so only have the store state, so read that in. state, err := o.store.State() if err != nil { o.mu.Unlock() return } sagap = sseq - state.AckFloor.Stream } } o.mu.Unlock() if sagap > 1 { // FIXME(dlc) - This is very inefficient, will need to fix. for seq := sseq; seq > sseq-sagap; seq-- { mset.ackMsg(o, seq) } } else { mset.ackMsg(o, sseq) } } var errBadAckUpdate = errors.New("jetstream cluster bad replicated ack update") var errBadDeliveredUpdate = errors.New("jetstream cluster bad replicated delivered update") func decodeAckUpdate(buf []byte) (dseq, sseq uint64, err error) { var bi, n int if dseq, n = binary.Uvarint(buf); n < 0 { return 0, 0, errBadAckUpdate } bi += n if sseq, n = binary.Uvarint(buf[bi:]); n < 0 { return 0, 0, errBadAckUpdate } return dseq, sseq, nil } func decodeDeliveredUpdate(buf []byte) (dseq, sseq, dc uint64, ts int64, err error) { var bi, n int if dseq, n = binary.Uvarint(buf); n < 0 { return 0, 0, 0, 0, errBadDeliveredUpdate } bi += n if sseq, n = binary.Uvarint(buf[bi:]); n < 0 { return 0, 0, 0, 0, errBadDeliveredUpdate } bi += n if dc, n = binary.Uvarint(buf[bi:]); n < 0 { return 0, 0, 0, 0, errBadDeliveredUpdate } bi += n if ts, n = binary.Varint(buf[bi:]); n < 0 { return 0, 0, 0, 0, errBadDeliveredUpdate } return dseq, sseq, dc, ts, nil } func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) error { stepDownIfLeader := func() error { if node := o.raftNode(); node != nil && isLeader { node.StepDown() } return errors.New("failed to update consumer leader status") } ca := o.consumerAssignment() if ca == nil { return stepDownIfLeader() } js.mu.Lock() s, account, err := js.srv, ca.Client.serviceAccount(), ca.err client, subject, reply := ca.Client, ca.Subject, ca.Reply hasResponded := ca.responded ca.responded = true js.mu.Unlock() streamName, consumerName := o.streamName(), o.String() acc, _ := s.LookupAccount(account) if acc == nil { return stepDownIfLeader() } if isLeader { s.Noticef("JetStream cluster new consumer leader for "%s > %s > %s"", ca.Client.serviceAccount(), streamName, consumerName) s.sendConsumerLeaderElectAdvisory(o) // Check for peer removal and process here if needed. js.checkPeers(ca.Group) } else { // We are stepping down. // Make sure if we are doing so because we have lost quorum that we send the appropriate advisories. if node := o.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > 5*time.Second { s.sendConsumerLostQuorumAdvisory(o) } } // Tell consumer to switch leader status. o.setLeader(isLeader) if !isLeader || hasResponded { if isLeader { o.clearInitialInfo() } return nil } var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} if err != nil { resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) if node := o.raftNode(); node != nil { o.sendCreateAdvisory() } } return nil } // Determines if we should send lost quorum advisory. We throttle these after first one. func (o *consumer) shouldSendLostQuorum() bool { o.mu.Lock() defer o.mu.Unlock() if time.Since(o.lqsent) >= lostQuorumAdvInterval { o.lqsent = time.Now() return true } return false } func (s *Server) sendConsumerLostQuorumAdvisory(o *consumer) { if o == nil { return } node, stream, consumer, acc := o.raftNode(), o.streamName(), o.String(), o.account() if node == nil { return } if !o.shouldSendLostQuorum() { return } s.Warnf("JetStream cluster consumer "%s > %s > %s" has NO quorum, stalled.", acc.GetName(), stream, consumer) subj := JSAdvisoryConsumerQuorumLostPre + "." + stream + "." + consumer adv := &JSConsumerQuorumLostAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerQuorumLostAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: stream, Consumer: consumer, Replicas: s.replicas(node), Domain: s.getOpts().JetStreamDomain, } // Send to the user"s account if not the system account. if acc != s.SystemAccount() { s.publishAdvisory(acc, subj, adv) } // Now do system level one. Place account info in adv, and nil account means system. adv.Account = acc.GetName() s.publishAdvisory(nil, subj, adv) } func (s *Server) sendConsumerLeaderElectAdvisory(o *consumer) { if o == nil { return } node, stream, consumer, acc := o.raftNode(), o.streamName(), o.String(), o.account() if node == nil { return } subj := JSAdvisoryConsumerLeaderElectedPre + "." + stream + "." + consumer adv := &JSConsumerLeaderElectedAdvisory{ TypedEvent: TypedEvent{ Type: JSConsumerLeaderElectedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, Stream: stream, Consumer: consumer, Leader: s.serverNameForNode(node.GroupLeader()), Replicas: s.replicas(node), Domain: s.getOpts().JetStreamDomain, } // Send to the user"s account if not the system account. if acc != s.SystemAccount() { s.publishAdvisory(acc, subj, adv) } // Now do system level one. Place account info in adv, and nil account means system. adv.Account = acc.GetName() s.publishAdvisory(nil, subj, adv) } type streamAssignmentResult struct { Account string `json:"account"` Stream string `json:"stream"` Response *JSApiStreamCreateResponse `json:"create_response,omitempty"` Restore *JSApiStreamRestoreResponse `json:"restore_response,omitempty"` Update bool `json:"is_update,omitempty"` } // Determine if this is an insufficient resources" error type. func isInsufficientResourcesErr(resp *JSApiStreamCreateResponse) bool { return resp != nil && resp.Error != nil && IsNatsErr(resp.Error, JSInsufficientResourcesErr, JSMemoryResourcesExceededErr, JSStorageResourcesExceededErr) } // Process error results of stream and consumer assignments. // Success will be handled by stream leader. func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { var result streamAssignmentResult if err := json.Unmarshal(msg, &result); err != nil { // TODO(dlc) - log return } acc, _ := js.srv.LookupAccount(result.Account) if acc == nil { // TODO(dlc) - log return } js.mu.Lock() defer js.mu.Unlock() s, cc := js.srv, js.cluster // This should have been done already in processStreamAssignment, but in // case we have a code path that gets here with no processStreamAssignment, // then we will do the proper thing. Otherwise will be a no-op. cc.removeInflightProposal(result.Account, result.Stream) // FIXME(dlc) - suppress duplicates? if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second // See if we should retry in case this cluster is full but there are others. if cfg, ci := sa.Config, sa.Client; cfg != nil && ci != nil && isInsufficientResourcesErr(result.Response) && canDelete { // If cluster is defined we can not retry. if cfg.Placement == nil || cfg.Placement.Cluster == _EMPTY_ { // If we have additional clusters to try we can retry. if ci != nil && len(ci.Alternates) > 0 { if rg, err := js.createGroupForStream(ci, cfg); err != nil { s.Warnf("Retrying cluster placement for stream "%s > %s" failed due to placement error: %+v", result.Account, result.Stream, err) } else { if org := sa.Group; org != nil && len(org.Peers) > 0 { s.Warnf("Retrying cluster placement for stream "%s > %s" due to insufficient resources in cluster %q", result.Account, result.Stream, s.clusterNameForNode(org.Peers[0])) } else { s.Warnf("Retrying cluster placement for stream "%s > %s" due to insufficient resources", result.Account, result.Stream) } // Pick a new preferred leader. rg.setPreferred() // Get rid of previous attempt. cc.meta.Propose(encodeDeleteStreamAssignment(sa)) // Propose new. sa.Group, sa.err = rg, nil cc.meta.Propose(encodeAddStreamAssignment(sa)) return } } } } // Respond to the user here. var resp string if result.Response != nil { resp = s.jsonResponse(result.Response) } else if result.Restore != nil { resp = s.jsonResponse(result.Restore) } if !sa.responded || result.Update { sa.responded = true js.srv.sendAPIErrResponse(sa.Client, acc, sa.Subject, sa.Reply, _EMPTY_, resp) } // Remove this assignment if possible. if canDelete { sa.err = NewJSClusterNotAssignedError() cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } } } func (js *jetStream) processConsumerAssignmentResults(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { var result consumerAssignmentResult if err := json.Unmarshal(msg, &result); err != nil { // TODO(dlc) - log return } acc, _ := js.srv.LookupAccount(result.Account) if acc == nil { // TODO(dlc) - log return } js.mu.Lock() defer js.mu.Unlock() s, cc := js.srv, js.cluster if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && sa.consumers != nil { if ca := sa.consumers[result.Consumer]; ca != nil && !ca.responded { js.srv.sendAPIErrResponse(ca.Client, acc, ca.Subject, ca.Reply, _EMPTY_, s.jsonResponse(result.Response)) ca.responded = true // Check if this failed. // TODO(dlc) - Could have mixed results, should track per peer. // Make sure this is recent response, do not delete existing consumers. if result.Response.Error != nil && result.Response.Error != NewJSConsumerNameExistError() && time.Since(ca.Created) < 2*time.Second { // So while we are deleting we will not respond to list/names requests. ca.err = NewJSClusterNotAssignedError() cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) s.Warnf("Proposing to delete consumer `%s > %s > %s" due to assignment response error: %v", result.Account, result.Stream, result.Consumer, result.Response.Error) } } } } const ( streamAssignmentSubj = "$SYS.JSC.STREAM.ASSIGNMENT.RESULT" consumerAssignmentSubj = "$SYS.JSC.CONSUMER.ASSIGNMENT.RESULT" ) // Lock should be held. func (js *jetStream) startUpdatesSub() { cc, s, c := js.cluster, js.srv, js.cluster.c if cc.streamResults == nil { cc.streamResults, _ = s.systemSubscribe(streamAssignmentSubj, _EMPTY_, false, c, js.processStreamAssignmentResults) } if cc.consumerResults == nil { cc.consumerResults, _ = s.systemSubscribe(consumerAssignmentSubj, _EMPTY_, false, c, js.processConsumerAssignmentResults) } if cc.stepdown == nil { cc.stepdown, _ = s.systemSubscribe(JSApiLeaderStepDown, _EMPTY_, false, c, s.jsLeaderStepDownRequest) } if cc.peerRemove == nil { cc.peerRemove, _ = s.systemSubscribe(JSApiRemoveServer, _EMPTY_, false, c, s.jsLeaderServerRemoveRequest) } if cc.peerStreamMove == nil { cc.peerStreamMove, _ = s.systemSubscribe(JSApiServerStreamMove, _EMPTY_, false, c, s.jsLeaderServerStreamMoveRequest) } if cc.peerStreamCancelMove == nil { cc.peerStreamCancelMove, _ = s.systemSubscribe(JSApiServerStreamCancelMove, _EMPTY_, false, c, s.jsLeaderServerStreamCancelMoveRequest) } if js.accountPurge == nil { js.accountPurge, _ = s.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, c, s.jsLeaderAccountPurgeRequest) } } // Lock should be held. func (js *jetStream) stopUpdatesSub() { cc := js.cluster if cc.streamResults != nil { cc.s.sysUnsubscribe(cc.streamResults) cc.streamResults = nil } if cc.consumerResults != nil { cc.s.sysUnsubscribe(cc.consumerResults) cc.consumerResults = nil } if cc.stepdown != nil { cc.s.sysUnsubscribe(cc.stepdown) cc.stepdown = nil } if cc.peerRemove != nil { cc.s.sysUnsubscribe(cc.peerRemove) cc.peerRemove = nil } if cc.peerStreamMove != nil { cc.s.sysUnsubscribe(cc.peerStreamMove) cc.peerStreamMove = nil } if cc.peerStreamCancelMove != nil { cc.s.sysUnsubscribe(cc.peerStreamCancelMove) cc.peerStreamCancelMove = nil } if js.accountPurge != nil { cc.s.sysUnsubscribe(js.accountPurge) js.accountPurge = nil } } func (js *jetStream) processLeaderChange(isLeader bool) { if isLeader { js.srv.Noticef("Self is new JetStream cluster metadata leader") } else { var node string if meta := js.getMetaGroup(); meta != nil { node = meta.GroupLeader() } if node == _EMPTY_ { js.srv.Noticef("JetStream cluster no metadata leader") } else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ { js.srv.Noticef("JetStream cluster new remote metadata leader") } else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ { js.srv.Noticef("JetStream cluster new metadata leader: %s", srv) } else { js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) } } js.mu.Lock() defer js.mu.Unlock() if isLeader { js.startUpdatesSub() } else { js.stopUpdatesSub() // TODO(dlc) - stepdown. } // If we have been signaled to check the streams, this is for a bug that left stream // assignments with no sync subject after an update and no way to sync/catchup outside of the RAFT layer. if isLeader && js.cluster.streamsCheck { cc := js.cluster for acc, asa := range cc.streams { for _, sa := range asa { if sa.Sync == _EMPTY_ { js.srv.Warnf("Stream assigment corrupt for stream "%s > %s"", acc, sa.Config.Name) nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client} nsa.Sync = syncSubjForStream() cc.meta.Propose(encodeUpdateStreamAssignment(nsa)) } } } // Clear check. cc.streamsCheck = false } } // Lock should be held. func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePeer string) bool { // Invoke placement algo passing RG peers that stay (existing) and the peer that is being removed (ignore) var retain, ignore []string for _, v := range sa.Group.Peers { if v == removePeer { ignore = append(ignore, v) } else { retain = append(retain, v) } } newPeers, placementError := cc.selectPeerGroup(len(sa.Group.Peers), sa.Group.Cluster, sa.Config, retain, 0, ignore) if placementError == nil { sa.Group.Peers = newPeers // Don"t influence preferred leader. sa.Group.Preferred = _EMPTY_ return true } // If we are here let"s remove the peer at least. for i, peer := range sa.Group.Peers { if peer == removePeer { sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1] sa.Group.Peers = sa.Group.Peers[:len(sa.Group.Peers)-1] break } } return false } type selectPeerError struct { excludeTag bool offline bool noStorage bool uniqueTag bool misc bool noJsClust bool noMatchTags map[string]struct{} } func (e *selectPeerError) Error() string { b := strings.Builder{} writeBoolErrReason := func(hasErr bool, errMsg string) { if !hasErr { return } b.WriteString(", ") b.WriteString(errMsg) } b.WriteString("no suitable peers for placement") writeBoolErrReason(e.offline, "peer offline") writeBoolErrReason(e.excludeTag, "exclude tag set") writeBoolErrReason(e.noStorage, "insufficient storage") writeBoolErrReason(e.uniqueTag, "server tag not unique") writeBoolErrReason(e.misc, "miscellaneous issue") writeBoolErrReason(e.noJsClust, "jetstream not enabled in cluster") if len(e.noMatchTags) != 0 { b.WriteString(", tags not matched [") var firstTagWritten bool for tag := range e.noMatchTags { if firstTagWritten { b.WriteString(", ") } firstTagWritten = true b.WriteRune("\"") b.WriteString(tag) b.WriteRune("\"") } b.WriteString("]") } return b.String() } func (e *selectPeerError) addMissingTag(t string) { if e.noMatchTags == nil { e.noMatchTags = map[string]struct{}{} } e.noMatchTags[t] = struct{}{} } func (e *selectPeerError) accumulate(eAdd *selectPeerError) { if eAdd == nil { return } acc := func(val *bool, valAdd bool) { if valAdd { *val = valAdd } } acc(&e.offline, eAdd.offline) acc(&e.excludeTag, eAdd.excludeTag) acc(&e.noStorage, eAdd.noStorage) acc(&e.uniqueTag, eAdd.uniqueTag) acc(&e.misc, eAdd.misc) acc(&e.noJsClust, eAdd.noJsClust) for tag := range eAdd.noMatchTags { e.addMissingTag(tag) } } // selectPeerGroup will select a group of peers to start a raft group. // when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int, ignore []string) ([]string, *selectPeerError) { if cluster == _EMPTY_ || cfg == nil { return nil, &selectPeerError{misc: true} } var maxBytes uint64 if cfg.MaxBytes > 0 { maxBytes = uint64(cfg.MaxBytes) } // Check for tags. var tags []string if cfg.Placement != nil && len(cfg.Placement.Tags) > 0 { tags = cfg.Placement.Tags } // Used for weighted sorting based on availability. type wn struct { id string avail uint64 ha int } var nodes []wn // peers is a randomized list s, peers := cc.s, cc.meta.Peers() uniqueTagPrefix := s.getOpts().JetStreamUniqueTag if uniqueTagPrefix != _EMPTY_ { for _, tag := range tags { if strings.HasPrefix(tag, uniqueTagPrefix) { // disable uniqueness check if explicitly listed in tags uniqueTagPrefix = _EMPTY_ break } } } var uniqueTags = make(map[string]*nodeInfo) checkUniqueTag := func(ni *nodeInfo) (bool, *nodeInfo) { for _, t := range ni.tags { if strings.HasPrefix(t, uniqueTagPrefix) { if n, ok := uniqueTags[t]; !ok { uniqueTags[t] = ni return true, ni } else { return false, n } } } // default requires the unique prefix to be present return false, nil } // Map existing. var ep map[string]struct{} if le := len(existing); le > 0 { if le >= r { return existing[:r], nil } ep = make(map[string]struct{}) for i, p := range existing { ep[p] = struct{}{} if uniqueTagPrefix == _EMPTY_ { continue } si, ok := s.nodeToInfo.Load(p) if !ok || si == nil || i < replaceFirstExisting { continue } ni := si.(nodeInfo) // collect unique tags, but do not require them as this node is already part of the peerset checkUniqueTag(&ni) } } // Map ignore var ip map[string]struct{} if li := len(ignore); li > 0 { ip = make(map[string]struct{}) for _, p := range ignore { ip[p] = struct{}{} } } maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets // An error is a result of multiple individual placement decisions. // Which is why we keep taps on how often which one happened. err := selectPeerError{} // Shuffle them up. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) for _, p := range peers { si, ok := s.nodeToInfo.Load(p.ID) if !ok || si == nil { err.misc = true continue } ni := si.(nodeInfo) // Only select from the designated named cluster. if ni.cluster != cluster { s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster) continue } // If we know its offline or we do not have config or err don"t consider. if ni.offline || ni.cfg == nil || ni.stats == nil { s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster) err.offline = true continue } // If ignore skip if _, ok := ip[p.ID]; ok { continue } // If existing also skip, we will add back in to front of the list when done. if _, ok := ep[p.ID]; ok { continue } if ni.tags.Contains(jsExcludePlacement) { s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present", ni.name, ni.cluster, ni.tags, jsExcludePlacement) err.excludeTag = true continue } if len(tags) > 0 { matched := true for _, t := range tags { if !ni.tags.Contains(t) { matched = false s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present", ni.name, ni.cluster, ni.tags, t) err.addMissingTag(t) break } } if !matched { continue } } var available uint64 var ha int if ni.stats != nil { switch cfg.Storage { case MemoryStorage: used := ni.stats.ReservedMemory if ni.stats.Memory > used { used = ni.stats.Memory } if ni.cfg.MaxMemory > int64(used) { available = uint64(ni.cfg.MaxMemory) - used } case FileStorage: used := ni.stats.ReservedStore if ni.stats.Store > used { used = ni.stats.Store } if ni.cfg.MaxStore > int64(used) { available = uint64(ni.cfg.MaxStore) - used } } ha = ni.stats.HAAssets } // Otherwise check if we have enough room if maxBytes set. if maxBytes > 0 && maxBytes > available { s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes", ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available) err.noStorage = true continue } // HAAssets contain _meta_ which we want to ignore, hence > and not >=. if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets { s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets) err.misc = true continue } if uniqueTagPrefix != _EMPTY_ { if unique, owner := checkUniqueTag(&ni); !unique { if owner != nil { s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s owned by %s@%s", ni.name, ni.cluster, ni.tags, owner.name, owner.cluster) } else { s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present", ni.name, ni.cluster, ni.tags) } err.uniqueTag = true continue } } // Add to our list of potential nodes. nodes = append(nodes, wn{p.ID, available, ha}) } // If we could not select enough peers, fail. if len(nodes) < (r - len(existing)) { s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)", (r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err) if len(peers) == 0 { err.noJsClust = true } return nil, &err } // Sort based on available from most to least. sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail }) // If we are placing a replicated stream, let"s sort based in haAssets, as that is more important to balance. if cfg.Replicas > 1 { sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha }) } var results []string if len(existing) > 0 { results = append(results, existing...) r -= len(existing) } for _, r := range nodes[:r] { results = append(results, r.id) } return results, nil } func groupNameForStream(peers []string, storage StorageType) string { return groupName("S", peers, storage) } func groupNameForConsumer(peers []string, storage StorageType) string { return groupName("C", peers, storage) } func groupName(prefix string, peers []string, storage StorageType) string { gns := getHash(nuid.Next()) return fmt.Sprintf("%s-R%d%s-%s", prefix, len(peers), storage.String()[:1], gns) } // returns stream count for this tier as well as applicable reservation size (not including reservations for cfg) // jetStream read lock should be held func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier string, cfg *StreamConfig) (int, int64) { numStreams := len(asa) reservation := int64(0) if tier == _EMPTY_ { for _, sa := range asa { if sa.Config.MaxBytes > 0 && sa.Config.Name != cfg.Name { if sa.Config.Storage == cfg.Storage { reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) } } } } else { numStreams = 0 for _, sa := range asa { if isSameTier(sa.Config, cfg) { numStreams++ if sa.Config.MaxBytes > 0 { if sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name { reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) } } } } } return numStreams, reservation } // createGroupForStream will create a group for assignment for the stream. // Lock should be held. func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerError) { replicas := cfg.Replicas if replicas == 0 { replicas = 1 } // Default connected cluster from the request origin. cc, cluster := js.cluster, ci.Cluster // If specified, override the default. clusterDefined := cfg.Placement != nil && cfg.Placement.Cluster != _EMPTY_ if clusterDefined { cluster = cfg.Placement.Cluster } clusters := []string{cluster} if !clusterDefined { clusters = append(clusters, ci.Alternates...) } // Need to create a group here. errs := &selectPeerError{} for _, cn := range clusters { peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0, nil) if len(peers) < replicas { errs.accumulate(err) continue } return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil } return nil, errs } func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) { // Grab our jetstream account info. acc.mu.RLock() jsa := acc.js acc.mu.RUnlock() if jsa == nil { return nil, _EMPTY_, nil, NewJSNotEnabledForAccountError() } jsa.usageMu.RLock() selectedLimits, tierName, ok := jsa.selectLimits(cfg) jsa.usageMu.RUnlock() if !ok { return nil, _EMPTY_, nil, NewJSNoLimitsError() } return &selectedLimits, tierName, jsa, nil } // Read lock needs to be held func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError { selectedLimits, tier, _, apiErr := acc.selectLimits(cfg) if apiErr != nil { return apiErr } asa := js.cluster.streams[acc.Name] numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg) // Check for inflight proposals... if cc := js.cluster; cc != nil && cc.inflight != nil { numStreams += len(cc.inflight[acc.Name]) } if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams { return NewJSMaximumStreamsLimitError() } // Check for account limits here before proposing. if err := js.checkAccountLimits(selectedLimits, cfg, reservations); err != nil { return NewJSStreamLimitsError(err, Unless(err)) } return nil } func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, config *StreamConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} ccfg, apiErr := s.checkStreamCfg(config, acc) if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } cfg := &ccfg // Now process the request and proposal. js.mu.Lock() defer js.mu.Unlock() // Capture if we have existing assignment first. osa := js.streamAssignment(acc.Name, cfg.Name) var areEqual bool if osa != nil { areEqual = reflect.DeepEqual(osa.Config, cfg) } // If this stream already exists, turn this into a stream info call. if osa != nil { // If they are the same then we will forward on as a stream info request. // This now matches single server behavior. if areEqual { // This works when we have a stream leader. If we have no leader let the dupe // go through as normal. We will handle properly on the other end. // We must check interest at the $SYS account layer, not user account since import // will always show interest. sisubj := fmt.Sprintf(clusterStreamInfoT, acc.Name, cfg.Name) if s.SystemAccount().Interest(sisubj) > 0 { isubj := fmt.Sprintf(JSApiStreamInfoT, cfg.Name) // We want to make sure we send along the client info. cij, _ := json.Marshal(ci) hdr := map[string]string{ ClientInfoHdr: string(cij), JSResponseType: jsCreateResponse, } // Send this as system account, but include client info header. s.sendInternalAccountMsgWithReply(nil, isubj, reply, hdr, nil, true) return } } else { resp.Error = NewJSStreamNameExistError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } } if cfg.Sealed { resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed")) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } var self *streamAssignment if osa != nil && areEqual { self = osa } // Check for subject collisions here. if cc.subjectsOverlap(acc.Name, cfg.Subjects, self) { resp.Error = NewJSStreamSubjectOverlapError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } apiErr = js.jsClusteredStreamLimitsCheck(acc, cfg) // Check for stream limits here before proposing. These need to be tracked from meta layer, not jsa. if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Raft group selection and placement. var rg *raftGroup if osa != nil && areEqual { rg = osa.Group } else { // Check inflight before proposing in case we have an existing inflight proposal. if cc.inflight == nil { cc.inflight = make(map[string]map[string]*raftGroup) } streams, ok := cc.inflight[acc.Name] if !ok { streams = make(map[string]*raftGroup) cc.inflight[acc.Name] = streams } else if existing, ok := streams[cfg.Name]; ok { // We have existing for same stream. Re-use same group. rg = existing } } // Create a new one here. if rg == nil { nrg, err := js.createGroupForStream(ci, cfg) if err != nil { resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } rg = nrg // Pick a preferred leader. rg.setPreferred() } // Sync subject for post snapshot sync. sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} if err := cc.meta.Propose(encodeAddStreamAssignment(sa)); err == nil { // On success, add this as an inflight proposal so we can apply limits // on concurrent create requests while this stream assignment has // possibly not been processed yet. if streams, ok := cc.inflight[acc.Name]; ok { streams[cfg.Name] = rg } } } var ( errReqTimeout = errors.New("timeout while waiting for response") errReqSrvExit = errors.New("server shutdown while waiting for response") ) // blocking utility call to perform requests on the system account // returns (synchronized) v or error func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) { isubj := fmt.Sprintf(subjFormat, args...) s.mu.Lock() inbox := s.newRespInbox() results := make(chan interface{}, 1) // Store our handler. s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { if err := json.Unmarshal(msg, v); err != nil { s.Warnf("Error unmarshalling response for request "%s":%v", isubj, err) return } select { case results <- v: default: s.Warnf("Failed placing request response on internal channel") } } s.mu.Unlock() s.sendInternalMsgLocked(isubj, inbox, nil, nil) const timeout = 2 * time.Second notActive := time.NewTimer(timeout) defer notActive.Stop() var err error var data interface{} select { case <-s.quitCh: err = errReqSrvExit case <-notActive.C: err = errReqTimeout case data = <-results: } // Clean up here. s.mu.Lock() if s.sys != nil && s.sys.replies != nil { delete(s.sys.replies, inbox) } s.mu.Unlock() return data, err } func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } // Now process the request and proposal. js.mu.Lock() defer js.mu.Unlock() var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} osa := js.streamAssignment(acc.Name, cfg.Name) if osa == nil { resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() ncfg, err := jsa.configUpdateCheck(osa.Config, cfg, s) js.mu.Lock() if err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } else { newCfg = ncfg } } else { resp.Error = NewJSNotEnabledForAccountError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Check for mirror changes which are not allowed. if !reflect.DeepEqual(newCfg.Mirror, osa.Config.Mirror) { resp.Error = NewJSStreamMirrorNotUpdatableError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Check for subject collisions here. if cc.subjectsOverlap(acc.Name, cfg.Subjects, osa) { resp.Error = NewJSStreamSubjectOverlapError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Make copy so to not change original. rg := osa.copyGroup().Group // Check for a move request. var isMoveRequest, isMoveCancel bool if lPeerSet := len(peerSet); lPeerSet > 0 { isMoveRequest = true // check if this is a cancellation if lPeerSet == osa.Config.Replicas && lPeerSet <= len(rg.Peers) { isMoveCancel = true // can only be a cancellation if the peer sets overlap as expected for i := 0; i < lPeerSet; i++ { if peerSet[i] != rg.Peers[i] { isMoveCancel = false break } } } } else { isMoveRequest = newCfg.Placement != nil && !reflect.DeepEqual(osa.Config.Placement, newCfg.Placement) } // Check for replica changes. isReplicaChange := newCfg.Replicas != osa.Config.Replicas // We stage consumer updates and do them after the stream update. var consumers []*consumerAssignment // Check if this is a move request, but no cancellation, and we are already moving this stream. if isMoveRequest && !isMoveCancel && osa.Config.Replicas != len(rg.Peers) { // obtain stats to include in error message msg := _EMPTY_ if s.allPeersOffline(rg) { msg = fmt.Sprintf("all %d peers offline", len(rg.Peers)) } else { // Need to release js lock. js.mu.Unlock() if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { msg = fmt.Sprintf("error retrieving info: %s", err.Error()) } else if si := si.(*StreamInfo); si != nil { currentCount := 0 if si.Cluster.Leader != _EMPTY_ { currentCount++ } combinedLag := uint64(0) for _, r := range si.Cluster.Replicas { if r.Current { currentCount++ } combinedLag += r.Lag } msg = fmt.Sprintf("total peers: %d, current peers: %d, combined lag: %d", len(rg.Peers), currentCount, combinedLag) } // Re-acquire here. js.mu.Lock() } resp.Error = NewJSStreamMoveInProgressError(msg) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Can not move and scale at same time. if isMoveRequest && isReplicaChange { resp.Error = NewJSStreamMoveAndScaleError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } if isReplicaChange { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil) if err != nil { resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Single nodes are not recorded by the NRG layer so we can rename. if len(peers) == 1 { rg.Name = groupNameForStream(peers, rg.Storage) } else if len(rg.Peers) == 1 { // This is scale up from being a singelton, set preferred to that singelton. rg.Preferred = rg.Peers[0] } rg.Peers = peers } else { // We are deleting nodes here. We want to do our best to preserve the current leader. // We have support now from above that guarantees we are in our own Go routine, so can // ask for stream info from the stream leader to make sure we keep the leader in the new list. var curLeader string if !s.allPeersOffline(rg) { // Need to release js lock. js.mu.Unlock() if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil { s.Warnf("Did not receive stream info results for "%s > %s" due to: %s", acc, cfg.Name, err) } else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ { curLeader = getHash(cl.Leader) } // Re-acquire here. js.mu.Lock() } // If we identified a leader make sure its part of the new group. selected := make([]string, 0, newCfg.Replicas) if curLeader != _EMPTY_ { selected = append(selected, curLeader) } for _, peer := range rg.Peers { if len(selected) == newCfg.Replicas { break } if peer == curLeader { continue } if si, ok := s.nodeToInfo.Load(peer); ok && si != nil { if si.(nodeInfo).offline { continue } selected = append(selected, peer) } } rg.Peers = selected } // Need to remap any consumers. for _, ca := range osa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. numPeers := len(ca.Group.Peers) if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. if numPeers == 1 && len(rg.Peers) > 1 { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ } // Assign new peers. cca.Group.Peers = rg.Peers // We can not propose here before the stream itself so we collect them. consumers = append(consumers, cca) } } } else if isMoveRequest { if len(peerSet) == 0 { nrg, err := js.createGroupForStream(ci, newCfg) if err != nil { resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // filter peers present in both sets for _, peer := range rg.Peers { found := false for _, newPeer := range nrg.Peers { if peer == newPeer { found = true break } } if !found { peerSet = append(peerSet, peer) } } peerSet = append(peerSet, nrg.Peers...) } if len(rg.Peers) == 1 { rg.Preferred = peerSet[0] } rg.Peers = peerSet for _, ca := range osa.consumers { cca := ca.copyGroup() r := cca.Config.replicas(osa.Config) // shuffle part of cluster peer set we will be keeping randPeerSet := copyStrings(peerSet[len(peerSet)-newCfg.Replicas:]) rand.Shuffle(newCfg.Replicas, func(i, j int) { randPeerSet[i], randPeerSet[j] = randPeerSet[j], randPeerSet[i] }) // move overlapping peers at the end of randPeerSet and keep a tally of non overlapping peers dropPeerSet := make([]string, 0, len(cca.Group.Peers)) for _, p := range cca.Group.Peers { found := false for i, rp := range randPeerSet { if p == rp { randPeerSet[i] = randPeerSet[newCfg.Replicas-1] randPeerSet[newCfg.Replicas-1] = p found = true break } } if !found { dropPeerSet = append(dropPeerSet, p) } } cPeerSet := randPeerSet[newCfg.Replicas-r:] // In case of a set or cancel simply assign if len(peerSet) == newCfg.Replicas { cca.Group.Peers = cPeerSet } else { cca.Group.Peers = append(dropPeerSet, cPeerSet...) } // make sure it overlaps with peers and remove if not if cca.Group.Preferred != _EMPTY_ { found := false for _, p := range cca.Group.Peers { if p == cca.Group.Preferred { found = true break } } if !found { cca.Group.Preferred = _EMPTY_ } } // We can not propose here before the stream itself so we collect them. consumers = append(consumers, cca) } } else { // All other updates make sure no preferred is set. rg.Preferred = _EMPTY_ } sa := &streamAssignment{Group: rg, Sync: osa.Sync, Created: osa.Created, Config: newCfg, Subject: subject, Reply: reply, Client: ci} cc.meta.Propose(encodeUpdateStreamAssignment(sa)) // Process any staged consumers. for _, ca := range consumers { cc.meta.Propose(encodeAddConsumerAssignment(ca)) } } func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, stream, subject, reply string, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.Lock() defer js.mu.Unlock() osa := js.streamAssignment(acc.Name, stream) if osa == nil { var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Reply: reply, Client: ci} cc.meta.Propose(encodeDeleteStreamAssignment(sa)) } // Process a clustered purge request. func (s *Server) jsClusteredStreamPurgeRequest( ci *ClientInfo, acc *Account, mset *stream, stream, subject, reply string, rmsg []byte, preq *JSApiStreamPurgeRequest, ) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.Lock() sa := js.streamAssignment(acc.Name, stream) if sa == nil { resp := JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) js.mu.Unlock() return } if n := sa.Group.node; n != nil { sp := &streamPurge{Stream: stream, LastSeq: mset.state().LastSeq, Subject: subject, Reply: reply, Client: ci, Request: preq} n.Propose(encodeStreamPurge(sp)) js.mu.Unlock() return } js.mu.Unlock() if mset == nil { return } var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} purged, err := mset.purge(preq) if err != nil { resp.Error = NewJSStreamGeneralError(err, Unless(err)) } else { resp.Purged = purged resp.Success = true } s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } func (s *Server) jsClusteredStreamRestoreRequest( ci *ClientInfo, acc *Account, req *JSApiStreamRestoreRequest, stream, subject, reply string, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.Lock() defer js.mu.Unlock() cfg := &req.Config resp := JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}} if err := js.jsClusteredStreamLimitsCheck(acc, cfg); err != nil { resp.Error = err s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } if sa := js.streamAssignment(ci.serviceAccount(), cfg.Name); sa != nil { resp.Error = NewJSStreamNameExistRestoreFailedError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Raft group selection and placement. rg, err := js.createGroupForStream(ci, cfg) if err != nil { resp.Error = NewJSClusterNoPeersError(err) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Pick a preferred leader. rg.setPreferred() sa := &streamAssignment{Group: rg, Sync: syncSubjForStream(), Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()} // Now add in our restore state and pre-select a peer to handle the actual receipt of the snapshot. sa.Restore = &req.State cc.meta.Propose(encodeAddStreamAssignment(sa)) } // Determine if all peers for this group are offline. func (s *Server) allPeersOffline(rg *raftGroup) bool { if rg == nil { return false } // Check to see if this stream has any servers online to respond. for _, peer := range rg.Peers { if si, ok := s.nodeToInfo.Load(peer); ok && si != nil { if !si.(nodeInfo).offline { return false } } } return true } // This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader. // This will be running in a separate Go routine. func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.RLock() var streams []*streamAssignment for _, sa := range cc.streams[acc.Name] { if IsNatsErr(sa.err, JSClusterNotAssignedErr) { continue } if filter != _EMPTY_ { // These could not have subjects auto-filled in since they are raw and unprocessed. if len(sa.Config.Subjects) == 0 { if SubjectsCollide(filter, sa.Config.Name) { streams = append(streams, sa) } } else { for _, subj := range sa.Config.Subjects { if SubjectsCollide(filter, subj) { streams = append(streams, sa) break } } } } else { streams = append(streams, sa) } } // Needs to be sorted for offsets etc. if len(streams) > 1 { sort.Slice(streams, func(i, j int) bool { return strings.Compare(streams[i].Config.Name, streams[j].Config.Name) < 0 }) } scnt := len(streams) if offset > scnt { offset = scnt } if offset > 0 { streams = streams[offset:] } if len(streams) > JSApiListLimit { streams = streams[:JSApiListLimit] } var resp = JSApiStreamListResponse{ ApiResponse: ApiResponse{Type: JSApiStreamListResponseType}, Streams: make([]*StreamInfo, 0, len(streams)), } js.mu.RUnlock() if len(streams) == 0 { resp.Limit = JSApiListLimit resp.Offset = offset s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) return } // Create an inbox for our responses and send out our requests. s.mu.Lock() inbox := s.newRespInbox() rc := make(chan *StreamInfo, len(streams)) // Store our handler. s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { var si StreamInfo if err := json.Unmarshal(msg, &si); err != nil { s.Warnf("Error unmarshaling clustered stream info response:%v", err) return } select { case rc <- &si: default: s.Warnf("Failed placing remote stream info result on internal channel") } } s.mu.Unlock() // Cleanup after. defer func() { s.mu.Lock() if s.sys != nil && s.sys.replies != nil { delete(s.sys.replies, inbox) } s.mu.Unlock() }() var missingNames []string sent := map[string]int{} // Send out our requests here. js.mu.RLock() for _, sa := range streams { if s.allPeersOffline(sa.Group) { // Place offline onto our results by hand here. si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)} resp.Streams = append(resp.Streams, si) missingNames = append(missingNames, sa.Config.Name) } else { isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) sent[sa.Config.Name] = len(sa.consumers) } } // Don"t hold lock. js.mu.RUnlock() const timeout = 4 * time.Second notActive := time.NewTimer(timeout) defer notActive.Stop() LOOP: for len(sent) > 0 { select { case <-s.quitCh: return case <-notActive.C: s.Warnf("Did not receive all stream info results for %q", acc) for sName := range sent { missingNames = append(missingNames, sName) } break LOOP case si := <-rc: consCount := sent[si.Config.Name] if consCount > 0 { si.State.Consumers = consCount } delete(sent, si.Config.Name) resp.Streams = append(resp.Streams, si) // Check to see if we are done. if len(resp.Streams) == len(streams) { break LOOP } } } // Needs to be sorted as well. if len(resp.Streams) > 1 { sort.Slice(resp.Streams, func(i, j int) bool { return strings.Compare(resp.Streams[i].Config.Name, resp.Streams[j].Config.Name) < 0 }) } resp.Total = scnt resp.Limit = JSApiListLimit resp.Offset = offset resp.Missing = missingNames s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } // This will do a scatter and gather operation for all consumers for this stream and account. // This will be running in a separate Go routine. func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, offset int, stream, subject, reply string, rmsg []byte) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.RLock() var consumers []*consumerAssignment if sas := cc.streams[acc.Name]; sas != nil { if sa := sas[stream]; sa != nil { // Copy over since we need to sort etc. for _, ca := range sa.consumers { consumers = append(consumers, ca) } } } // Needs to be sorted. if len(consumers) > 1 { sort.Slice(consumers, func(i, j int) bool { return strings.Compare(consumers[i].Name, consumers[j].Name) < 0 }) } ocnt := len(consumers) if offset > ocnt { offset = ocnt } if offset > 0 { consumers = consumers[offset:] } if len(consumers) > JSApiListLimit { consumers = consumers[:JSApiListLimit] } // Send out our requests here. var resp = JSApiConsumerListResponse{ ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType}, Consumers: []*ConsumerInfo{}, } js.mu.RUnlock() if len(consumers) == 0 { resp.Limit = JSApiListLimit resp.Offset = offset s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) return } // Create an inbox for our responses and send out requests. s.mu.Lock() inbox := s.newRespInbox() rc := make(chan *ConsumerInfo, len(consumers)) // Store our handler. s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { var ci ConsumerInfo if err := json.Unmarshal(msg, &ci); err != nil { s.Warnf("Error unmarshaling clustered consumer info response:%v", err) return } select { case rc <- &ci: default: s.Warnf("Failed placing consumer info result on internal chan") } } s.mu.Unlock() // Cleanup after. defer func() { s.mu.Lock() if s.sys != nil && s.sys.replies != nil { delete(s.sys.replies, inbox) } s.mu.Unlock() }() var missingNames []string sent := map[string]struct{}{} // Send out our requests here. js.mu.RLock() for _, ca := range consumers { if s.allPeersOffline(ca.Group) { // Place offline onto our results by hand here. ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)} resp.Consumers = append(resp.Consumers, ci) missingNames = append(missingNames, ca.Name) } else { isubj := fmt.Sprintf(clusterConsumerInfoT, ca.Client.serviceAccount(), stream, ca.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) sent[ca.Name] = struct{}{} } } // Don"t hold lock. js.mu.RUnlock() const timeout = 4 * time.Second notActive := time.NewTimer(timeout) defer notActive.Stop() LOOP: for len(sent) > 0 { select { case <-s.quitCh: return case <-notActive.C: s.Warnf("Did not receive all consumer info results for "%s > %s"", acc, stream) for cName := range sent { missingNames = append(missingNames, cName) } break LOOP case ci := <-rc: delete(sent, ci.Name) resp.Consumers = append(resp.Consumers, ci) // Check to see if we are done. if len(resp.Consumers) == len(consumers) { break LOOP } } } // Needs to be sorted as well. if len(resp.Consumers) > 1 { sort.Slice(resp.Consumers, func(i, j int) bool { return strings.Compare(resp.Consumers[i].Name, resp.Consumers[j].Name) < 0 }) } resp.Total = len(resp.Consumers) resp.Limit = JSApiListLimit resp.Offset = offset resp.Missing = missingNames s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } func encodeStreamPurge(sp *streamPurge) []byte { var bb bytes.Buffer bb.WriteByte(byte(purgeStreamOp)) json.NewEncoder(&bb).Encode(sp) return bb.Bytes() } func decodeStreamPurge(buf []byte) (*streamPurge, error) { var sp streamPurge err := json.Unmarshal(buf, &sp) return &sp, err } func (s *Server) jsClusteredConsumerDeleteRequest(ci *ClientInfo, acc *Account, stream, consumer, subject, reply string, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.Lock() defer js.mu.Unlock() var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} sa := js.streamAssignment(acc.Name, stream) if sa == nil { resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } if sa.consumers == nil { resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } oca := sa.consumers[consumer] if oca == nil { resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } oca.deleted = true ca := &consumerAssignment{Group: oca.Group, Stream: stream, Name: consumer, Config: oca.Config, Subject: subject, Reply: reply, Client: ci} cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) } func encodeMsgDelete(md *streamMsgDelete) []byte { var bb bytes.Buffer bb.WriteByte(byte(deleteMsgOp)) json.NewEncoder(&bb).Encode(md) return bb.Bytes() } func decodeMsgDelete(buf []byte) (*streamMsgDelete, error) { var md streamMsgDelete err := json.Unmarshal(buf, &md) return &md, err } func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset *stream, stream, subject, reply string, req *JSApiMsgDeleteRequest, rmsg []byte) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } js.mu.Lock() sa := js.streamAssignment(acc.Name, stream) if sa == nil { s.Debugf("Message delete failed, could not locate stream "%s > %s"", acc.Name, stream) js.mu.Unlock() return } // Check for single replica items. if n := sa.Group.node; n != nil { md := streamMsgDelete{Seq: req.Seq, NoErase: req.NoErase, Stream: stream, Subject: subject, Reply: reply, Client: ci} n.Propose(encodeMsgDelete(&md)) js.mu.Unlock() return } js.mu.Unlock() if mset == nil { return } var err error var removed bool if req.NoErase { removed, err = mset.removeMsg(req.Seq) } else { removed, err = mset.eraseMsg(req.Seq) } var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} if err != nil { resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err)) } else if !removed { resp.Error = NewJSSequenceNotFoundError(req.Seq) } else { resp.Success = true } s.sendAPIResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(resp)) } func encodeAddStreamAssignment(sa *streamAssignment) []byte { var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) json.NewEncoder(&bb).Encode(sa) return bb.Bytes() } func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) json.NewEncoder(&bb).Encode(sa) return bb.Bytes() } func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) json.NewEncoder(&bb).Encode(sa) return bb.Bytes() } func decodeStreamAssignment(buf []byte) (*streamAssignment, error) { var sa streamAssignment err := json.Unmarshal(buf, &sa) if err != nil { return nil, err } fixCfgMirrorWithDedupWindow(sa.Config) return &sa, err } // createGroupForConsumer will create a new group from same peer set as the stream. func (cc *jetStreamCluster) createGroupForConsumer(cfg *ConsumerConfig, sa *streamAssignment) *raftGroup { if len(sa.Group.Peers) == 0 || cfg.Replicas > len(sa.Group.Peers) { return nil } peers := copyStrings(sa.Group.Peers) var _ss [5]string active := _ss[:0] // Calculate all active peers. for _, peer := range peers { if sir, ok := cc.s.nodeToInfo.Load(peer); ok && sir != nil { if !sir.(nodeInfo).offline { active = append(active, peer) } } } if quorum := cfg.Replicas/2 + 1; quorum > len(active) { // Not enough active to satisfy the request. return nil } // If we want less then our parent stream, select from active. if cfg.Replicas > 0 && cfg.Replicas < len(peers) { // Pedantic in case stream is say R5 and consumer is R3 and 3 or more offline, etc. if len(active) < cfg.Replicas { return nil } // First shuffle the active peers and then select to account for replica = 1. rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] }) peers = active[:cfg.Replicas] } storage := sa.Config.Storage if cfg.MemoryStorage { storage = MemoryStorage } return &raftGroup{Name: groupNameForConsumer(peers, storage), Storage: storage, Peers: peers} } // jsClusteredConsumerRequest is first point of entry to create a consumer with R > 1. func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, stream string, cfg *ConsumerConfig) { js, cc := s.getJetStreamCluster() if js == nil || cc == nil { return } var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} streamCfg, ok := js.clusterStreamConfig(acc.Name, stream) if !ok { resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg) if apiErr != nil { resp.Error = apiErr s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } srvLim := &s.getOpts().JetStreamLimits // Make sure we have sane defaults setConsumerConfigDefaults(cfg, srvLim, selectedLimits) if err := checkConsumerCfg(cfg, srvLim, &streamCfg, acc, selectedLimits, false); err != nil { resp.Error = err s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } js.mu.Lock() defer js.mu.Unlock() // Lookup the stream assignment. sa := js.streamAssignment(acc.Name, stream) if sa == nil { resp.Error = NewJSStreamNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Check for max consumers here to short circuit if possible. // Start with limit on a stream, but if one is defined at the level of the account // and is lower, use that limit. maxc := sa.Config.MaxConsumers if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { maxc = selectedLimits.MaxConsumers } if maxc > 0 { // Don"t count DIRECTS. total := 0 for _, ca := range sa.consumers { if ca.Config != nil && !ca.Config.Direct { total++ } } if total >= maxc { resp.Error = NewJSMaximumConsumersLimitError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } } // Also short circuit if DeliverLastPerSubject is set with no FilterSubject. if cfg.DeliverPolicy == DeliverLastPerSubject { if cfg.FilterSubject == _EMPTY_ { resp.Error = NewJSConsumerInvalidPolicyError(fmt.Errorf("consumer delivery policy is deliver last per subject, but FilterSubject is not set")) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } } // Setup proper default for ack wait if we are in explicit ack mode. if cfg.AckWait == 0 && (cfg.AckPolicy == AckExplicit || cfg.AckPolicy == AckAll) { cfg.AckWait = JsAckWaitDefault } // Setup default of -1, meaning no limit for MaxDeliver. if cfg.MaxDeliver == 0 { cfg.MaxDeliver = -1 } // Set proper default for max ack pending if we are ack explicit and none has been set. if cfg.AckPolicy == AckExplicit && cfg.MaxAckPending == 0 { cfg.MaxAckPending = JsDefaultMaxAckPending } var ca *consumerAssignment var oname string // See if we have an existing one already under same durable name or // if name was set by the user. if isDurableConsumer(cfg) || cfg.Name != _EMPTY_ { if cfg.Name != _EMPTY_ { oname = cfg.Name } else { oname = cfg.Durable } if ca = sa.consumers[oname]; ca != nil && !ca.deleted { // Do quick sanity check on new cfg to prevent here if possible. if err := acc.checkNewConsumerConfig(ca.Config, cfg); err != nil { resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } } } // If this is new consumer. if ca == nil { rg := cc.createGroupForConsumer(cfg, sa) if rg == nil { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } // Pick a preferred leader. rg.setPreferred() // Inherit cluster from stream. rg.Cluster = sa.Group.Cluster // We need to set the ephemeral here before replicating. if !isDurableConsumer(cfg) { // We chose to have ephemerals be R=1 unless stream is interest or workqueue. // Consumer can override. if sa.Config.Retention == LimitsPolicy && cfg.Replicas <= 1 { rg.Peers = []string{rg.Preferred} rg.Name = groupNameForConsumer(rg.Peers, rg.Storage) } if cfg.Name != _EMPTY_ { oname = cfg.Name } else { // Make sure name is unique. for { oname = createConsumerName() if sa.consumers != nil { if sa.consumers[oname] != nil { continue } } break } } } if len(rg.Peers) > 1 { if maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets; maxHaAssets != 0 { for _, peer := range rg.Peers { if ni, ok := s.nodeToInfo.Load(peer); ok { ni := ni.(nodeInfo) if stats := ni.stats; stats != nil && stats.HAAssets > maxHaAssets { resp.Error = NewJSInsufficientResourcesError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) s.Warnf("%s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d"+ " for (durable) consumer %s placement on stream %s", ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets, oname, stream) return } } } } } ca = &consumerAssignment{ Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC(), } } else { nca := ca.copyGroup() rBefore := ca.Config.replicas(sa.Config) rAfter := cfg.replicas(sa.Config) var curLeader string if rBefore != rAfter { // We are modifying nodes here. We want to do our best to preserve the current leader. // We have support now from above that guarantees we are in our own Go routine, so can // ask for stream info from the stream leader to make sure we keep the leader in the new list. if !s.allPeersOffline(ca.Group) { // Need to release js lock. js.mu.Unlock() if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil { s.Warnf("Did not receive consumer info results for "%s > %s > %s" due to: %s", acc, sa.Config.Name, cfg.Durable, err) } else if cl := ci.(*ConsumerInfo).Cluster; cl != nil { curLeader = getHash(cl.Leader) } // Re-acquire here. js.mu.Lock() } } if rBefore < rAfter { newPeerSet := nca.Group.Peers // scale up by adding new members from the stream peer set that are not yet in the consumer peer set streamPeerSet := copyStrings(sa.Group.Peers) rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] }) for _, p := range streamPeerSet { found := false for _, sp := range newPeerSet { if sp == p { found = true break } } if !found { newPeerSet = append(newPeerSet, p) if len(newPeerSet) == rAfter { break } } } nca.Group.Peers = newPeerSet nca.Group.Preferred = curLeader } else if rBefore > rAfter { newPeerSet := nca.Group.Peers // mark leader preferred and move it to end nca.Group.Preferred = curLeader if nca.Group.Preferred != _EMPTY_ { for i, p := range newPeerSet { if nca.Group.Preferred == p { newPeerSet[i] = newPeerSet[len(newPeerSet)-1] newPeerSet[len(newPeerSet)-1] = p } } } // scale down by removing peers from the end newPeerSet = newPeerSet[len(newPeerSet)-rAfter:] nca.Group.Peers = newPeerSet } // Update config and client info on copy of existing. nca.Config = cfg nca.Client = ci nca.Subject = subject nca.Reply = reply ca = nca } eca := encodeAddConsumerAssignment(ca) // Mark this as pending. if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) } sa.consumers[ca.Name] = ca // Do formal proposal. cc.meta.Propose(eca) } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) json.NewEncoder(&bb).Encode(ca) return bb.Bytes() } func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) json.NewEncoder(&bb).Encode(ca) return bb.Bytes() } func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { var ca consumerAssignment err := json.Unmarshal(buf, &ca) return &ca, err } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { b, err := json.Marshal(ca) if err != nil { return nil } // TODO(dlc) - Streaming better approach here probably. var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) bb.Write(s2.Encode(nil, b)) return bb.Bytes() } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { var ca consumerAssignment js, err := s2.Decode(nil, buf) if err != nil { return nil, err } err = json.Unmarshal(js, &ca) return &ca, err } var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg") func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq uint64, ts int64, err error) { var le = binary.LittleEndian if len(buf) < 26 { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } lseq = le.Uint64(buf) buf = buf[8:] ts = int64(le.Uint64(buf)) buf = buf[8:] sl := int(le.Uint16(buf)) buf = buf[2:] if len(buf) < sl { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } subject = string(buf[:sl]) buf = buf[sl:] if len(buf) < 2 { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } rl := int(le.Uint16(buf)) buf = buf[2:] if len(buf) < rl { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } reply = string(buf[:rl]) buf = buf[rl:] if len(buf) < 2 { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } hl := int(le.Uint16(buf)) buf = buf[2:] if len(buf) < hl { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } if hdr = buf[:hl]; len(hdr) == 0 { hdr = nil } buf = buf[hl:] if len(buf) < 4 { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } ml := int(le.Uint32(buf)) buf = buf[4:] if len(buf) < ml { return _EMPTY_, _EMPTY_, nil, nil, 0, 0, errBadStreamMsg } if msg = buf[:ml]; len(msg) == 0 { msg = nil } return subject, reply, hdr, msg, lseq, ts, nil } // Helper to return if compression allowed. func (mset *stream) compressAllowed() bool { mset.clMu.Lock() defer mset.clMu.Unlock() return mset.compressOK } func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) []byte { return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, false) } // Threshold for compression. // TODO(dlc) - Eventually make configurable. const compressThreshold = 4 * 1024 // If allowed and contents over the threshold we will compress. func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte { shouldCompress := compressOK && len(subject)+len(reply)+len(hdr)+len(msg) > compressThreshold elen := 1 + 8 + 8 + len(subject) + len(reply) + len(hdr) + len(msg) elen += (2 + 2 + 2 + 4) // Encoded lengths, 4bytes // TODO(dlc) - check sizes of subject, reply and hdr, make sure uint16 ok. buf := make([]byte, elen) buf[0] = byte(streamMsgOp) var le = binary.LittleEndian wi := 1 le.PutUint64(buf[wi:], lseq) wi += 8 le.PutUint64(buf[wi:], uint64(ts)) wi += 8 le.PutUint16(buf[wi:], uint16(len(subject))) wi += 2 copy(buf[wi:], subject) wi += len(subject) le.PutUint16(buf[wi:], uint16(len(reply))) wi += 2 copy(buf[wi:], reply) wi += len(reply) le.PutUint16(buf[wi:], uint16(len(hdr))) wi += 2 if len(hdr) > 0 { copy(buf[wi:], hdr) wi += len(hdr) } le.PutUint32(buf[wi:], uint32(len(msg))) wi += 4 if len(msg) > 0 { copy(buf[wi:], msg) wi += len(msg) } // Check if we should compress. if shouldCompress { nbuf := make([]byte, s2.MaxEncodedLen(elen)) nbuf[0] = byte(compressedStreamMsgOp) ebuf := s2.Encode(nbuf[1:], buf[1:wi]) // Only pay cost of decode the other side if we compressed. // S2 will allow us to try without major penalty for non-compressable data. if len(ebuf) < wi { nbuf = nbuf[:len(ebuf)+1] buf, wi = nbuf, len(nbuf) } } return buf[:wi] } // StreamSnapshot is used for snapshotting and out of band catch up in clustered mode. type streamSnapshot struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` Failed uint64 `json:"clfs"` Deleted []uint64 `json:"deleted,omitempty"` } // Grab a snapshot of a stream for clustered mode. func (mset *stream) stateSnapshot() []byte { mset.mu.RLock() defer mset.mu.RUnlock() return mset.stateSnapshotLocked() } // Grab a snapshot of a stream for clustered mode. // Lock should be held. func (mset *stream) stateSnapshotLocked() []byte { state := mset.store.State() snap := &streamSnapshot{ Msgs: state.Msgs, Bytes: state.Bytes, FirstSeq: state.FirstSeq, LastSeq: state.LastSeq, Failed: mset.clfs, Deleted: state.Deleted, } b, _ := json.Marshal(snap) return b } // Will check if we can do message compression in RAFT and catchup logic. func (mset *stream) checkAllowMsgCompress(peers []string) { allowed := true for _, id := range peers { sir, ok := mset.srv.nodeToInfo.Load(id) if !ok || sir == nil { allowed = false break } // Check for capability. if si := sir.(nodeInfo); si.cfg == nil || !si.cfg.CompressOK { allowed = false break } } mset.mu.Lock() mset.compressOK = allowed mset.mu.Unlock() } // To warn when we are getting too far behind from what has been proposed vs what has been committed. const streamLagWarnThreshold = 10_000 // processClusteredMsg will propose the inbound message to the underlying raft group. func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte) error { // For possible error response. var response []byte mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed mset.mu.RUnlock() // This should not happen but possible now that we allow scale up, and scale down where this could trigger. if node == nil { return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0) } // Check that we are the leader. This can be false if we have scaled up from an R1 that had inbound queued messages. if !isLeader { return NewJSClusterNotLeaderError() } // Bail here if sealed. if isSealed { var resp = JSPubAckResponse{PubAck: &PubAck{Stream: mset.name()}, Error: NewJSStreamSealedError()} b, _ := json.Marshal(resp) mset.outq.sendMsg(reply, b) return NewJSStreamSealedError() } // Check here pre-emptively if we have exceeded this server limits. if js.limitsExceeded(stype) { s.resourcesExeededError() if canRespond { b, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: NewJSInsufficientResourcesError()}) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, b, nil, 0)) } // Stepdown regardless. if node := mset.raftNode(); node != nil { node.StepDown() } return NewJSInsufficientResourcesError() } // Check here pre-emptively if we have exceeded our account limits. var exceeded bool jsa.usageMu.Lock() jsaLimits, ok := jsa.limits[tierName] if !ok { jsa.usageMu.Unlock() err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name) s.RateLimitWarnf(err.Error()) if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.Error = NewJSNoLimitsError() response, _ = json.Marshal(resp) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } return err } t, ok := jsa.usage[tierName] if !ok { t = &jsaStorage{} jsa.usage[tierName] = t } if st == MemoryStorage { total := t.total.store + int64(memStoreMsgSize(subject, hdr, msg)*uint64(rf)) if jsaLimits.MaxMemory > 0 && total > jsaLimits.MaxMemory { exceeded = true } } else { total := t.total.store + int64(fileStoreMsgSize(subject, hdr, msg)*uint64(rf)) if jsaLimits.MaxStore > 0 && total > jsaLimits.MaxStore { exceeded = true } } jsa.usageMu.Unlock() // If we have exceeded our account limits go ahead and return. if exceeded { err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name) s.RateLimitWarnf(err.Error()) if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.Error = NewJSAccountResourcesExceededError() response, _ = json.Marshal(resp) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } return err } // Check msgSize if we have a limit set there. Again this works if it goes through but better to be pre-emptive. if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize { err := fmt.Errorf("JetStream message size exceeds limits for "%s > %s"", jsa.acc().Name, mset.cfg.Name) s.RateLimitWarnf(err.Error()) if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.Error = NewJSStreamMessageExceedsMaximumError() response, _ = json.Marshal(resp) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } return err } // Some header checks can be checked pre proposal. Most can not. if len(hdr) > 0 { // For CAS operations, e.g. ExpectedLastSeqPerSubject, we can also check here and not have to go through. // Can only precheck for seq != 0. if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 { var smv StoreMsg var fseq uint64 sm, err := store.LoadLastMsg(subject, &smv) if sm != nil { fseq = sm.seq } if err != nil || fseq != seq { if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamWrongLastSequenceError(fseq) b, _ := json.Marshal(resp) outq.sendMsg(reply, b) } return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq) } } // Expected stream name can also be pre-checked. if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.PubAck = &PubAck{Stream: name} resp.Error = NewJSStreamNotMatchError() b, _ := json.Marshal(resp) outq.sendMsg(reply, b) } return errors.New("expected stream does not match") } } // Since we encode header len as u16 make sure we do not exceed. // Again this works if it goes through but better to be pre-emptive. if len(hdr) > math.MaxUint16 { err := fmt.Errorf("JetStream header size exceeds limits for "%s > %s"", jsa.acc().Name, mset.cfg.Name) s.RateLimitWarnf(err.Error()) if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} resp.Error = NewJSStreamHeaderExceedsMaximumError() response, _ = json.Marshal(resp) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } return err } // Proceed with proposing this message. // We only use mset.clseq for clustering and in case we run ahead of actual commits. // Check if we need to set initial value here mset.clMu.Lock() if mset.clseq == 0 || mset.clseq < lseq { // Re-capture lseq, clfs = mset.lastSeqAndCLFS() mset.clseq = lseq + clfs } esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK) mset.clseq++ // Do proposal. err := node.Propose(esm) if err != nil && mset.clseq > 0 { mset.clseq-- } // Check to see if we are being overrun. // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured. if mset.clseq-(lseq+clfs) > streamLagWarnThreshold { lerr := fmt.Errorf("JetStream stream "%s > %s" has high message lag", jsa.acc().Name, mset.cfg.Name) s.RateLimitWarnf(lerr.Error()) } mset.clMu.Unlock() if err != nil { if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}} resp.Error = &ApiError{Code: 503, Description: err.Error()} response, _ = json.Marshal(resp) // If we errored out respond here. outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } } if err != nil && isOutOfSpaceErr(err) { s.handleOutOfSpace(mset) } return err } // For requesting messages post raft snapshot to catch up streams post server restart. // Any deleted msgs etc will be handled inline on catchup. type streamSyncRequest struct { Peer string `json:"peer,omitempty"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` } // Given a stream state that represents a snapshot, calculate the sync request based on our current state. func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapshot) *streamSyncRequest { // Quick check if we are already caught up. if state.LastSeq >= snap.LastSeq { return nil } return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID()} } // processSnapshotDeletes will update our current store based on the snapshot // but only processing deletes and new FirstSeq / purges. func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { var state StreamState mset.store.FastState(&state) // Always adjust if FirstSeq has moved beyond our state. if snap.FirstSeq > state.FirstSeq { mset.store.Compact(snap.FirstSeq) mset.store.FastState(&state) mset.setLastSeq(state.LastSeq) } // Range the deleted and delete if applicable. for _, dseq := range snap.Deleted { if dseq > state.FirstSeq && dseq <= state.LastSeq { mset.store.RemoveMsg(dseq) } } } func (mset *stream) setCatchupPeer(peer string, lag uint64) { if peer == _EMPTY_ { return } mset.mu.Lock() if mset.catchups == nil { mset.catchups = make(map[string]uint64) } mset.catchups[peer] = lag mset.mu.Unlock() } // Will decrement by one. func (mset *stream) updateCatchupPeer(peer string) { if peer == _EMPTY_ { return } mset.mu.Lock() if lag := mset.catchups[peer]; lag > 0 { mset.catchups[peer] = lag - 1 } mset.mu.Unlock() } func (mset *stream) clearCatchupPeer(peer string) { mset.mu.Lock() if mset.catchups != nil { delete(mset.catchups, peer) } mset.mu.Unlock() } // Lock should be held. func (mset *stream) clearAllCatchupPeers() { if mset.catchups != nil { mset.catchups = nil } } func (mset *stream) lagForCatchupPeer(peer string) uint64 { mset.mu.RLock() defer mset.mu.RUnlock() if mset.catchups == nil { return 0 } return mset.catchups[peer] } func (mset *stream) hasCatchupPeers() bool { mset.mu.RLock() defer mset.mu.RUnlock() return len(mset.catchups) > 0 } func (mset *stream) setCatchingUp() { mset.mu.Lock() mset.catchup = true mset.mu.Unlock() } func (mset *stream) clearCatchingUp() { mset.mu.Lock() mset.catchup = false mset.mu.Unlock() } func (mset *stream) isCatchingUp() bool { mset.mu.RLock() defer mset.mu.RUnlock() return mset.catchup } // Determine if a non-leader is current. // Lock should be held. func (mset *stream) isCurrent() bool { if mset.node == nil { return true } return mset.node.Current() && !mset.catchup } // Maximum requests for the whole server that can be in flight. const maxConcurrentSyncRequests = 8 var ( errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected") errCatchupStalled = errors.New("catchup stalled") errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away. errCatchupBadMsg = errors.New("bad catchup msg") errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg") ) // Process a stream snapshot. func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) { // Update any deletes, etc. mset.processSnapshotDeletes(snap) mset.mu.Lock() var state StreamState mset.clfs = snap.Failed mset.store.FastState(&state) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node qname := fmt.Sprintf("[ACC:%s] stream "%s" snapshot", mset.acc.Name, mset.cfg.Name) mset.mu.Unlock() // Bug that would cause this to be empty on stream update. if subject == _EMPTY_ { return errCatchupCorruptSnapshot } // Just return if up to date or already exceeded limits. if sreq == nil || js.limitsExceeded(mset.cfg.Storage) { return nil } // Pause the apply channel for our raft group while we catch up. if err := n.PauseApply(); err != nil { return err } defer func() { // Don"t bother resuming if server or stream is gone. if e != errCatchupStreamStopped && e != ErrServerNotRunning { n.ResumeApply() } }() // Set our catchup state. mset.setCatchingUp() defer mset.clearCatchingUp() var sub *subscription var err error const activityInterval = 10 * time.Second notActive := time.NewTimer(activityInterval) defer notActive.Stop() defer func() { if sub != nil { s.sysUnsubscribe(sub) } // Make sure any consumers are updated for the pending amounts. mset.mu.Lock() for _, o := range mset.consumers { o.mu.Lock() if o.isLeader() { o.streamNumPending() } o.mu.Unlock() } mset.mu.Unlock() }() var releaseSem bool releaseSyncOutSem := func() { if !releaseSem { return } // Need to use select for the server shutdown case. select { case s.syncOutSem <- struct{}{}: default: } releaseSem = false } // On exit, we will release our semaphore if we acquired it. defer releaseSyncOutSem() // Check our final state when we exit cleanly. // If this snapshot was for messages no longer held by the leader we want to make sure // we are synched for the next message sequence properly. lastRequested := sreq.LastSeq checkFinalState := func() { if mset != nil { mset.mu.Lock() var state StreamState mset.store.FastState(&state) var didReset bool firstExpected := lastRequested + 1 if state.FirstSeq != firstExpected { // Reset our notion of first. mset.store.Compact(firstExpected) mset.store.FastState(&state) // Make sure last is also correct in case this also moved. mset.lseq = state.LastSeq didReset = true } mset.mu.Unlock() if didReset { s.Warnf("Catchup for stream "%s > %s" resetting first sequence: %d on catchup complete", mset.account(), mset.name(), firstExpected) } } } RETRY: // On retry, we need to release the semaphore we got. Call will be no-op // if releaseSem boolean has not been set to true on successfully getting // the semaphore. releaseSyncOutSem() if n.GroupLeader() == _EMPTY_ { return fmt.Errorf("catchup for stream "%s > %s" aborted, no leader", mset.account(), mset.name()) } // If we have a sub clear that here. if sub != nil { s.sysUnsubscribe(sub) sub = nil } // Block here if we have too many requests in flight. <-s.syncOutSem releaseSem = true if !s.isRunning() { return ErrServerNotRunning } // We may have been blocked for a bit, so the reset need to ensure that we // consume the already fired timer. if !notActive.Stop() { select { case <-notActive.C: default: } } notActive.Reset(activityInterval) // Grab sync request again on failures. if sreq == nil { mset.mu.Lock() var state StreamState mset.store.FastState(&state) sreq = mset.calculateSyncRequest(&state, snap) mset.mu.Unlock() if sreq == nil { return nil } // Reset notion of lastRequested lastRequested = sreq.LastSeq } // Used to transfer message from the wire to another Go routine internally. type im struct { msg []byte reply string } // This is used to notify the leader that it should stop the runCatchup // because we are either bailing out or going to retry due to an error. notifyLeaderStopCatchup := func(mrec *im, err error) { if mrec.reply == _EMPTY_ { return } s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, err.Error()) } msgsQ := newIPQueue[*im](s, qname) defer msgsQ.unregister() // Send our catchup request here. reply := syncReplySubject() sub, err = s.sysSubscribe(reply, func(_ *subscription, _ *client, _ *Account, _, reply string, msg []byte) { // Make copies // TODO(dlc) - Since we are using a buffer from the inbound client/route. msgsQ.push(&im{copyBytes(msg), reply}) }) if err != nil { s.Errorf("Could not subscribe to stream catchup: %v", err) goto RETRY } // Send our sync request. b, _ := json.Marshal(sreq) s.sendInternalMsgLocked(subject, reply, nil, b) // Remember when we sent this out to avoimd loop spins on errors below. reqSendTime := time.Now() // Clear our sync request and capture last. last := sreq.LastSeq sreq = nil // Run our own select loop here. for qch, lch := n.QuitC(), n.LeadChangeC(); ; { select { case <-msgsQ.ch: notActive.Reset(activityInterval) mrecs := msgsQ.pop() for _, mrec := range mrecs { msg := mrec.msg // Check for eof signaling. if len(msg) == 0 { msgsQ.recycle(&mrecs) checkFinalState() return nil } if lseq, err := mset.processCatchupMsg(msg); err == nil { if mrec.reply != _EMPTY_ { s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil) } if lseq >= last { msgsQ.recycle(&mrecs) return nil } } else if isOutOfSpaceErr(err) { notifyLeaderStopCatchup(mrec, err) return err } else if err == NewJSInsufficientResourcesError() { notifyLeaderStopCatchup(mrec, err) if mset.js.limitsExceeded(mset.cfg.Storage) { s.resourcesExeededError() } else { s.Warnf("Catchup for stream "%s > %s" errored, account resources exceeded: %v", mset.account(), mset.name(), err) } msgsQ.recycle(&mrecs) return err } else { notifyLeaderStopCatchup(mrec, err) s.Warnf("Catchup for stream "%s > %s" errored, will retry: %v", mset.account(), mset.name(), err) msgsQ.recycle(&mrecs) // Make sure we do not spin and make things worse. const minRetryWait = 2 * time.Second elapsed := time.Since(reqSendTime) if elapsed < minRetryWait { select { case <-s.quitCh: return ErrServerNotRunning case <-qch: return errCatchupStreamStopped case <-time.After(minRetryWait - elapsed): } } goto RETRY } } msgsQ.recycle(&mrecs) case <-notActive.C: if mrecs := msgsQ.pop(); len(mrecs) > 0 { mrec := mrecs[0] notifyLeaderStopCatchup(mrec, errCatchupStalled) msgsQ.recycle(&mrecs) } s.Warnf("Catchup for stream "%s > %s" stalled", mset.account(), mset.name()) goto RETRY case <-s.quitCh: return ErrServerNotRunning case <-qch: return errCatchupStreamStopped case isLeader := <-lch: if isLeader { n.StepDown() goto RETRY } } } } // processCatchupMsg will be called to process out of band catchup msgs from a sync request. func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) { if len(msg) == 0 { return 0, errCatchupBadMsg } op := entryOp(msg[0]) if op != streamMsgOp && op != compressedStreamMsgOp { return 0, errCatchupBadMsg } mbuf := msg[1:] if op == compressedStreamMsgOp { var err error mbuf, err = s2.Decode(nil, mbuf) if err != nil { panic(err.Error()) } } subj, _, hdr, msg, seq, ts, err := decodeStreamMsg(mbuf) if err != nil { return 0, errCatchupBadMsg } mset.mu.RLock() st := mset.cfg.Storage ddloaded := mset.ddloaded tierName := mset.tier mset.mu.RUnlock() if mset.js.limitsExceeded(st) { return 0, NewJSInsufficientResourcesError() } else if exceeded, apiErr := mset.jsa.limitsExceeded(st, tierName); apiErr != nil { return 0, apiErr } else if exceeded { return 0, NewJSInsufficientResourcesError() } // Put into our store // Messages to be skipped have no subject or timestamp. // TODO(dlc) - formalize with skipMsgOp if subj == _EMPTY_ && ts == 0 { if lseq := mset.store.SkipMsg(); lseq != seq { return 0, errCatchupWrongSeqForSkip } } else if err := mset.store.StoreRawMsg(subj, hdr, msg, seq, ts); err != nil { return 0, err } // Update our lseq. mset.setLastSeq(seq) // Check for MsgId and if we have one here make sure to update our internal map. if len(hdr) > 0 { if msgId := getMsgId(hdr); msgId != _EMPTY_ { if !ddloaded { mset.mu.Lock() mset.rebuildDedupe() mset.mu.Unlock() } mset.storeMsgId(&ddentry{msgId, seq, ts}) } } return seq, nil } func (mset *stream) handleClusterSyncRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { var sreq streamSyncRequest if err := json.Unmarshal(msg, &sreq); err != nil { // Log error. return } mset.srv.startGoRoutine(func() { mset.runCatchup(reply, &sreq) }) } // Lock should be held. func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv ci := &ClusterInfo{Name: s.ClusterName()} for _, peer := range rg.Peers { if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { si := sir.(nodeInfo) pi := &PeerInfo{Peer: peer, Name: si.name, Current: false, Offline: true} ci.Replicas = append(ci.Replicas, pi) } } return ci } // clusterInfo will report on the status of the raft group. func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { if js == nil { return nil } js.mu.RLock() defer js.mu.RUnlock() s := js.srv if rg == nil || rg.node == nil { return &ClusterInfo{ Name: s.ClusterName(), Leader: s.Name(), } } n := rg.node ci := &ClusterInfo{ Name: s.ClusterName(), Leader: s.serverNameForNode(n.GroupLeader()), } now := time.Now() id, peers := n.ID(), n.Peers() // If we are leaderless, do not suppress putting us in the peer list. if ci.Leader == _EMPTY_ { id = _EMPTY_ } for _, rp := range peers { if rp.ID != id && rg.isMember(rp.ID) { var lastSeen time.Duration if now.After(rp.Last) && rp.Last.Unix() != 0 { lastSeen = now.Sub(rp.Last) } current := rp.Current if current && lastSeen > lostQuorumInterval { current = false } // Create a peer info with common settings if the peer has not been seen // yet (which can happen after the whole cluster is stopped and only some // of the nodes are restarted). pi := &PeerInfo{ Current: current, Offline: true, Active: lastSeen, Lag: rp.Lag, Peer: rp.ID, } // If node is found, complete/update the settings. if sir, ok := s.nodeToInfo.Load(rp.ID); ok && sir != nil { si := sir.(nodeInfo) pi.Name, pi.Offline, pi.cluster = si.name, si.offline, si.cluster } else { // If not, then add a name that indicates that the server name // is unknown at this time, and clear the lag since it is misleading // (the node may not have that much lag). // Note: We return now the Peer ID in PeerInfo, so the "(peerID: %s)" // would technically not be required, but keeping it for now. pi.Name, pi.Lag = fmt.Sprintf("Server name unknown at this time (peerID: %s)", rp.ID), 0 } ci.Replicas = append(ci.Replicas, pi) } } // Order the result based on the name so that we get something consistent // when doing repeated stream info in the CLI, etc... sort.Slice(ci.Replicas, func(i, j int) bool { return ci.Replicas[i].Name < ci.Replicas[j].Name }) return ci } func (mset *stream) checkClusterInfo(ci *ClusterInfo) { for _, r := range ci.Replicas { peer := getHash(r.Name) if lag := mset.lagForCatchupPeer(peer); lag > 0 { r.Current = false r.Lag = lag } } } // Return a list of alternates, ranked by preference order to the request, of stream mirrors. // This allows clients to select or get more information about read replicas that could be a // better option to connect to versus the original source. func (js *jetStream) streamAlternates(ci *ClientInfo, stream string) []StreamAlternate { if js == nil { return nil } js.mu.RLock() defer js.mu.RUnlock() s, cc := js.srv, js.cluster // Track our domain. domain := s.getOpts().JetStreamDomain // No clustering just return nil. if cc == nil { return nil } acc, _ := s.LookupAccount(ci.serviceAccount()) if acc == nil { return nil } // Collect our ordering first for clusters. weights := make(map[string]int) all := []string{ci.Cluster} all = append(all, ci.Alternates...) for i := 0; i < len(all); i++ { weights[all[i]] = len(all) - i } var alts []StreamAlternate for _, sa := range cc.streams[acc.Name] { // Add in ourselves and any mirrors. if sa.Config.Name == stream || (sa.Config.Mirror != nil && sa.Config.Mirror.Name == stream) { alts = append(alts, StreamAlternate{Name: sa.Config.Name, Domain: domain, Cluster: sa.Group.Cluster}) } } // If just us don"t fill in. if len(alts) == 1 { return nil } // Sort based on our weights that originate from the request itself. sort.Slice(alts, func(i, j int) bool { return weights[alts[i].Cluster] > weights[alts[j].Cluster] }) return alts } // Internal request for stream info, this is coming on the wire so do not block here. func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, subject, reply string, _ []byte) { go mset.processClusterStreamInfoRequest(reply) } func (mset *stream) processClusterStreamInfoRequest(reply string) { mset.mu.RLock() sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg isLeader := mset.isLeader() mset.mu.RUnlock() // By design all members will receive this. Normally we only want the leader answering. // But if we have stalled and lost quorom all can respond. if sa != nil && !js.isGroupLeaderless(sa.Group) && !isLeader { return } // If we are not the leader let someone else possible respond first. if !isLeader { time.Sleep(200 * time.Millisecond) } si := &StreamInfo{ Created: mset.createdTime(), State: mset.state(), Config: config, Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), } // Check for out of band catchups. if mset.hasCatchupPeers() { mset.checkClusterInfo(si.Cluster) } sysc.sendInternalMsg(reply, _EMPTY_, nil, si) } // 64MB for now, for the total server. This is max we will blast out if asked to // do so to another server for purposes of catchups. // This number should be ok on 1Gbit interface. const defaultMaxTotalCatchupOutBytes = int64(64 * 1024 * 1024) // Current total outstanding catchup bytes. func (s *Server) gcbTotal() int64 { s.gcbMu.RLock() defer s.gcbMu.RUnlock() return s.gcbOut } // Returns true if Current total outstanding catchup bytes is below // the maximum configured. func (s *Server) gcbBelowMax() bool { s.gcbMu.RLock() defer s.gcbMu.RUnlock() return s.gcbOut <= s.gcbOutMax } // Adds `sz` to the server"s total outstanding catchup bytes and to `localsz` // under the gcbMu lock. The `localsz` points to the local outstanding catchup // bytes of the runCatchup go routine of a given stream. func (s *Server) gcbAdd(localsz *int64, sz int64) { s.gcbMu.Lock() atomic.AddInt64(localsz, sz) s.gcbOut += sz if s.gcbOut >= s.gcbOutMax && s.gcbKick == nil { s.gcbKick = make(chan struct{}) } s.gcbMu.Unlock() } // Removes `sz` from the server"s total outstanding catchup bytes and from // `localsz`, but only if `localsz` is non 0, which would signal that gcSubLast // has already been invoked. See that function for details. // Must be invoked under the gcbMu lock. func (s *Server) gcbSubLocked(localsz *int64, sz int64) { if atomic.LoadInt64(localsz) == 0 { return } atomic.AddInt64(localsz, -sz) s.gcbOut -= sz if s.gcbKick != nil && s.gcbOut < s.gcbOutMax { close(s.gcbKick) s.gcbKick = nil } } // Locked version of gcbSubLocked() func (s *Server) gcbSub(localsz *int64, sz int64) { s.gcbMu.Lock() s.gcbSubLocked(localsz, sz) s.gcbMu.Unlock() } // Similar to gcbSub() but reset `localsz` to 0 at the end under the gcbMu lock. // This will signal further calls to gcbSub() for this `localsz` pointer that // nothing should be done because runCatchup() has exited and any remaining // outstanding bytes value has already been decremented. func (s *Server) gcbSubLast(localsz *int64) { s.gcbMu.Lock() s.gcbSubLocked(localsz, *localsz) *localsz = 0 s.gcbMu.Unlock() } // Returns our kick chan, or nil if it does not exist. func (s *Server) cbKickChan() <-chan struct{} { s.gcbMu.RLock() defer s.gcbMu.RUnlock() return s.gcbKick } func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { s := mset.srv defer s.grWG.Done() const maxOutBytes = int64(8 * 1024 * 1024) // 8MB for now, these are all internal, from server to server const maxOutMsgs = int32(32 * 1024) outb := int64(0) outm := int32(0) // On abnormal exit make sure to update global total. defer s.gcbSubLast(&outb) // Flow control processing. ackReplySize := func(subj string) int64 { if li := strings.LastIndexByte(subj, btsep); li > 0 && li < len(subj) { return parseAckReplyNum(subj[li+1:]) } return 0 } nextBatchC := make(chan struct{}, 1) nextBatchC <- struct{}{} remoteQuitCh := make(chan struct{}) // Setup ackReply for flow control. ackReply := syncAckSubject() ackSub, _ := s.sysSubscribe(ackReply, func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { if len(msg) > 0 { s.Warnf("Catchup for stream "%s > %s" was aborted on the remote due to: %q", mset.account(), mset.name(), msg) s.sysUnsubscribe(sub) close(remoteQuitCh) return } sz := ackReplySize(subject) s.gcbSub(&outb, sz) atomic.AddInt32(&outm, -1) mset.updateCatchupPeer(sreq.Peer) // Kick ourselves and anyone else who might have stalled on global state. select { case nextBatchC <- struct{}{}: default: } }) defer s.sysUnsubscribe(ackSub) ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d") const activityInterval = 5 * time.Second notActive := time.NewTimer(activityInterval) defer notActive.Stop() // Grab our state. var state StreamState mset.mu.RLock() mset.store.FastState(&state) mset.mu.RUnlock() // Reset notion of first if this request wants sequences before our starting sequence // and we would have nothing to send. If we have partial messages still need to send skips for those. if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq { s.Debugf("Catchup for stream "%s > %s" resetting request first sequence from %d to %d", mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq) sreq.FirstSeq = state.FirstSeq } // Setup sequences to walk through. seq, last := sreq.FirstSeq, sreq.LastSeq mset.setCatchupPeer(sreq.Peer, last-seq) // Check if we can compress during this. compressOk := mset.compressAllowed() var spb int sendNextBatchAndContinue := func(qch chan struct{}) bool { // Update our activity timer. notActive.Reset(activityInterval) // Check if we know we will not enter the loop because we are done. if seq > last { s.Noticef("Catchup for stream "%s > %s" complete", mset.account(), mset.name()) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false } // If we already sent a batch, we will try to make sure we process around // half the FC responses - or reach a certain amount of time - before sending // the next batch. if spb > 0 { mw := time.NewTimer(100 * time.Millisecond) for done := false; !done; { select { case <-nextBatchC: done = int(atomic.LoadInt32(&outm)) <= spb/2 case <-mw.C: done = true case <-s.quitCh: return false case <-qch: return false case <-remoteQuitCh: return false } } spb = 0 } var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { sm, err := mset.store.LoadMsg(seq, &smv) // if this is not a deleted msg, bail out. if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg { if err == ErrStoreEOF { var state StreamState mset.store.FastState(&state) if seq > state.LastSeq { // The snapshot has a larger last sequence then we have. This could be due to a truncation // when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow, // but tested a ton of those with no success. s.Warnf("Catchup for stream "%s > %s" completed, but requested sequence %d was larger then current state: %+v", mset.account(), mset.name(), seq, state) // Try our best to redo our invalidated snapshot as well. if n := mset.raftNode(); n != nil { n.InstallSnapshot(mset.stateSnapshot()) } // Signal EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false } } s.Warnf("Error loading message for catchup "%s > %s": %v", mset.account(), mset.name(), err) return false } var em []byte if sm != nil { em = encodeStreamMsgAllowCompress(sm.subj, _EMPTY_, sm.hdr, sm.msg, sm.seq, sm.ts, compressOk) } else { // Skip record for deleted msg. em = encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq, 0) } // Place size in reply subject for flow control. l := int64(len(em)) reply := fmt.Sprintf(ackReplyT, l) s.gcbAdd(&outb, l) atomic.AddInt32(&outm, 1) s.sendInternalMsgLocked(sendSubject, reply, nil, em) spb++ if seq == last { s.Noticef("Catchup for stream "%s > %s" complete", mset.account(), mset.name()) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false } select { case <-remoteQuitCh: return false default: } } return true } // Grab stream quit channel. mset.mu.RLock() qch := mset.qch mset.mu.RUnlock() if qch == nil { return } // Run as long as we are still active and need catchup. // FIXME(dlc) - Purge event? Stream delete? for { // Get this each time, will be non-nil if globally blocked and we will close to wake everyone up. cbKick := s.cbKickChan() select { case <-s.quitCh: return case <-qch: return case <-remoteQuitCh: mset.clearCatchupPeer(sreq.Peer) return case <-notActive.C: s.Warnf("Catchup for stream "%s > %s" stalled", mset.account(), mset.name()) return case <-nextBatchC: if !sendNextBatchAndContinue(qch) { mset.clearCatchupPeer(sreq.Peer) return } case <-cbKick: if !sendNextBatchAndContinue(qch) { mset.clearCatchupPeer(sreq.Peer) return } } } } const jscAllSubj = "$JSC.>" func syncSubjForStream() string { return syncSubject("$JSC.SYNC") } func syncReplySubject() string { return syncSubject("$JSC.R") } func infoReplySubject() string { return syncSubject("$JSC.R") } func syncAckSubject() string { return syncSubject("$JSC.ACK") + ".*" } func syncSubject(pre string) string { var sb strings.Builder sb.WriteString(pre) sb.WriteByte(btsep) var b [replySuffixLen]byte rn := rand.Int63() for i, l := 0, rn; i < len(b); i++ { b[i] = digits[l%base] l /= base } sb.Write(b[:]) return sb.String() } const ( clusterStreamInfoT = "$JSC.SI.%s.%s" clusterConsumerInfoT = "$JSC.CI.%s.%s.%s" jsaUpdatesSubT = "$JSC.ARU.%s.*" jsaUpdatesPubT = "$JSC.ARU.%s.%s" )
Related articles
NATS gwb
listen: "127.0.0.1:-1" gateway { name: "B" listen: "127.0.0.1:5228" include "gws.conf" }
NATS server no ou
-----BEGIN CERTIFICATE----- MIIDhTCCAm2gAwIBAgIUbXHf4iAemXfIpLSWpRMkEVsdjy8wDQYJKoZIhvcNAQEL BQAwTDEkMCIGA1UEChMbU3luYWRpYSBDb21tdW5pY2F0aW9ucyBJbmMuMRAwDgYD VQQLEwdOQVRTLmlvMRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMTkwMjE4MjE0MjAw WhcNMjQwMjE3MjE0MjAwWjAUMRIwEAY
NATS multi_accounts
listen: 127.0.0.1:4033 http: 127.0.0.1:8033 password = "s3cr3t!" accounts: { engineering: { users = [ {user: alice, password: $password} {user: bob, password: $password} ] } legal: { users = [ {us
NATS auth_seed
# Cluster Seed Node listen: 127.0.0.1:5222 http: 8222 cluster { listen: 127.0.0.1:4248 name: xyz authorization { user: ruser password: T0PS3cr3T! timeout: 1 } } no_sys_acc: true
NATS tls_cert_san_auth
listen: localhost:9335 tls { cert_file = "./configs/certs/sans/server.pem" key_file = "./configs/certs/sans/server-key.pem" ca_file = "./configs/certs/sans/ca.pem" verify = true verify_and_map = true } authorization { # Default permissions
NATS pse_freebsd_amd64
// Copyright 2015-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses
NATS tlsverify_noca
# Simple TLS config file listen: 127.0.0.1:5443 tls { # Server cert cert_file: "./configs/certs/server-cert.pem" # Server private key key_file: "./configs/certs/server-key.pem" # Specified time for handshake to complete timeout: 2 # Requ
NATS CODE OF CONDUCT
## Community Code of Conduct NATS follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
NATS regenerate_rdns_svid
#!/usr/bin/env bash set -euo pipefail # # regenerate_rnds_svid: just remake the certs in the rdns & svid dirs. # # We"re getting the hard requirements down in scripts, can integrate all into # one all-singing all-dancing script later, so that anyone can
NATS jetstream_events
// Copyright 2020-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses