Skip to content

Commit

Permalink
nsqlookupd: fix write lock startvation
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Nov 4, 2019
1 parent 5b67f58 commit 7aa646f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 45 deletions.
7 changes: 5 additions & 2 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
defer s.ctx.nsqlookupd.DB.RUnlock()

data := make(map[string][]map[string]interface{})
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
s.ctx.nsqlookupd.DB.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(map[string]*Producer)
r := k.(Registration)
key := r.Category + ":" + r.Key + ":" + r.SubKey
for _, p := range producers {
m := map[string]interface{}{
Expand All @@ -324,7 +326,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
}
data[key] = append(data[key], m)
}
}
return true
})

return data, nil
}
83 changes: 40 additions & 43 deletions nsqlookupd/registration_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]ProducerMap
registrationMap *sync.Map
}

type Registration struct {
Expand Down Expand Up @@ -54,41 +54,36 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool {

func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]ProducerMap),
registrationMap: &sync.Map{},
}
}

// add a registration key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
r.registrationMap.LoadOrStore(k, make(map[string]*Producer))
}

// add a producer to a registration
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
producers := r.registrationMap[k]
val, _ := r.registrationMap.LoadOrStore(k, make(map[string]*Producer))
producers := val.(map[string]*Producer)
_, found := producers[p.peerInfo.id]
if found == false {
producers[p.peerInfo.id] = p
}

r.registrationMap.Store(k, producers)
return !found
}

// remove a producer from a registration
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
value, ok := r.registrationMap.Load(k)
producers := value.(map[string]*Producer)
if !ok {
return false, 0
}
Expand All @@ -99,74 +94,76 @@ func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {

// Note: this leaves keys in the DB even if they have empty lists
delete(producers, id)

r.registrationMap.Store(k, producers)

return removed, len(producers)
}

// remove a Registration and all it's producers
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
r.registrationMap.Delete(k)
}

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}

func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
if _, ok := r.registrationMap.Load(k); ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
r.registrationMap.Range(func(k, _ interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
results = append(results, k.(Registration))
}
results = append(results, k)
}
return true
})

return results
}

func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
return ProducerMap2Slice(r.registrationMap[k])
val, _ := r.registrationMap.Load(k)
return ProducerMap2Slice(val.(map[string]*Producer))
}

results := make(map[string]struct{})
var retProducers Producers
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
r.registrationMap.Range(func(k, v interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
producers := v.(map[string]*Producer)
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
}
}
}
}
return true
})

return retProducers
}

func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()
results := Registrations{}
for k, producers := range r.registrationMap {
r.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(map[string]*Producer)
if _, exists := producers[id]; exists {
results = append(results, k)
results = append(results, k.(Registration))
}
}

return true
})
return results
}

Expand Down

0 comments on commit 7aa646f

Please sign in to comment.