From 55de6238db3a22c149b2fbba79dbe61d55d99372 Mon Sep 17 00:00:00 2001 From: Andy Xie Date: Mon, 4 Nov 2019 13:43:32 +0800 Subject: [PATCH] nsqlookupd: fix write lock starvation --- nsqlookupd/http.go | 7 ++- nsqlookupd/registration_db.go | 89 ++++++++++++++++++----------------- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index c9fa2e1a7..32e7f6665 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -308,7 +308,9 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro defer s.ctx.nsqlookupd.DB.RUnlock() data := make(map[string][]map[string]interface{}) - for r, producers := range s.ctx.nsqlookupd.DB.registrationMap { + s.ctx.nsqlookupd.DB.registrationMap.Range(func(k, v interface{}) bool { + producers := v.(ProducerMap) + r := k.(Registration) key := r.Category + ":" + r.Key + ":" + r.SubKey for _, p := range producers { m := map[string]interface{}{ @@ -324,7 +326,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro } data[key] = append(data[key], m) } - } + return true + }) return data, nil } diff --git a/nsqlookupd/registration_db.go b/nsqlookupd/registration_db.go index ba1d1d09c..38513486d 100644 --- a/nsqlookupd/registration_db.go +++ b/nsqlookupd/registration_db.go @@ -9,7 +9,7 @@ import ( type RegistrationDB struct { sync.RWMutex - registrationMap map[Registration]ProducerMap + registrationMap *sync.Map } type Registration struct { @@ -54,33 +54,26 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool { func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ - registrationMap: make(map[Registration]ProducerMap), + registrationMap: &sync.Map{}, } } // add a registration key func (r *RegistrationDB) AddRegistration(k Registration) { - r.Lock() - defer r.Unlock() - _, ok := r.registrationMap[k] - if !ok { - r.registrationMap[k] = make(map[string]*Producer) - } + r.registrationMap.LoadOrStore(k, make(ProducerMap)) } // add a producer to a registration func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { r.Lock() defer r.Unlock() - _, ok := r.registrationMap[k] - if !ok { - r.registrationMap[k] = make(map[string]*Producer) - } - producers := r.registrationMap[k] + val, _ := r.registrationMap.LoadOrStore(k, make(ProducerMap)) + producers := val.(ProducerMap) _, found := producers[p.peerInfo.id] if found == false { producers[p.peerInfo.id] = p } + return !found } @@ -88,10 +81,11 @@ func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { r.Lock() defer r.Unlock() - producers, ok := r.registrationMap[k] + value, ok := r.registrationMap.Load(k) if !ok { return false, 0 } + producers := value.(ProducerMap) removed := false if _, exists := producers[id]; exists { removed = true @@ -99,14 +93,13 @@ func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { // Note: this leaves keys in the DB even if they have empty lists delete(producers, id) + return removed, len(producers) } // remove a Registration and all it's producers func (r *RegistrationDB) RemoveRegistration(k Registration) { - r.Lock() - defer r.Unlock() - delete(r.registrationMap, k) + r.registrationMap.Delete(k) } func (r *RegistrationDB) needFilter(key string, subkey string) bool { @@ -114,59 +107,71 @@ func (r *RegistrationDB) needFilter(key string, subkey string) bool { } func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations { - r.RLock() - defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} - if _, ok := r.registrationMap[k]; ok { + if _, ok := r.registrationMap.Load(k); ok { return Registrations{k} } return Registrations{} } results := Registrations{} - for k := range r.registrationMap { - if !k.IsMatch(category, key, subkey) { - continue + r.registrationMap.Range(func(k, _ interface{}) bool { + if k.(Registration).IsMatch(category, key, subkey) { + results = append(results, k.(Registration)) } - results = append(results, k) - } + return true + }) + return results } func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers { - r.RLock() - defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} - return ProducerMap2Slice(r.registrationMap[k]) + val, _ := r.registrationMap.Load(k) + + r.RLock() + defer r.RUnlock() + return ProducerMap2Slice(val.(ProducerMap)) } + r.RLock() results := make(map[string]struct{}) var retProducers Producers - for k, producers := range r.registrationMap { - if !k.IsMatch(category, key, subkey) { - continue - } - for _, producer := range producers { - _, found := results[producer.peerInfo.id] - if found == false { - results[producer.peerInfo.id] = struct{}{} - retProducers = append(retProducers, producer) + r.registrationMap.Range(func(k, v interface{}) bool { + if k.(Registration).IsMatch(category, key, subkey) { + producers := v.(ProducerMap) + for _, producer := range producers { + _, found := results[producer.peerInfo.id] + if found == false { + results[producer.peerInfo.id] = struct{}{} + retProducers = append(retProducers, producer) + } } } - } + return true + }) + + r.RUnlock() + return retProducers } func (r *RegistrationDB) LookupRegistrations(id string) Registrations { r.RLock() - defer r.RUnlock() + results := Registrations{} - for k, producers := range r.registrationMap { + r.registrationMap.Range(func(k, v interface{}) bool { + producers := v.(ProducerMap) if _, exists := producers[id]; exists { - results = append(results, k) + results = append(results, k.(Registration)) } - } + + return true + }) + + r.RUnlock() + return results }