Skip to content

Latest commit

 

History

History
197 lines (160 loc) · 4.74 KB

2016-05-18-watch_in_etcd.md

File metadata and controls

197 lines (160 loc) · 4.74 KB
layout title date categories tags excerpt
post
Watch API and TTL implementation in etcd
2016-05-18 10:00:30 -0700
Distributed
etcd
Watch API and TTL implementation in etcd

Watch API

Etcd Watch API可以用于当Client监听数据(Key)的变化,从而得到实时通知。

  • notify

当etcd更新一个key时,就会将event暂存,并通知所有正在watch的Watcher(即client):

// Set creates or replace the node at nodePath.
func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
...
	// Set new value
	e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
...
	s.WatcherHub.notify(e) ///save event

	return e, nil
}


// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
	e = wh.EventHistory.addEvent(e) // add event into the eventHistory

	segments := strings.Split(e.Node.Key, "/")

	currPath := "/"

	// walk through all the segments of the path and notify the watchers
	// if the path is "/foo/bar", it will notify watchers with path "/",
	// "/foo" and "/foo/bar"

	for _, segment := range segments {
		currPath = path.Join(currPath, segment)
		// notify the watchers who interests in the changes of current path
		wh.notifyWatchers(e, currPath, false)
	}
}
  • watching
func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request, s Server) error {
	// Create a command to watch from a given index (default 0).
	var sinceIndex uint64 = 0
	var err error

	if waitIndex != "" {
		sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
	}

	watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)

...
	select {
	case <-closeChan:
		watcher.Remove()
	case event := <-watcher.EventChan: ///等待Update事件
		if req.Method == "HEAD" {
			return nil
		}
		b, _ := json.Marshal(event)
		w.Write(b)
	}
}

Client还可以指定Index,watch历史数据:

# curl -s -L 'http://127.0.0.1:4001/v2/keys/dir1/d1/key1' | python -mjson.tool                          
{
    "action": "get",
    "node": {
        "createdIndex": 20024,
        "key": "/dir1/d1/key1",
        "modifiedIndex": 20024,
        "value": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
    }
}

### watch index=20021后,key1的value:

# curl -s -L 'http://127.0.0.1:4001/v2/keys/dir1/d1/key1?wait=true&waitIndex=20021' | python -mjson.tool
{
    "action": "set",
    "node": {
        "createdIndex": 20023,
        "key": "/dir1/d1/key1",
        "modifiedIndex": 20023,
        "value": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    },
    "prevNode": {
        "createdIndex": 20020,
        "key": "/dir1/d1/key1",
        "modifiedIndex": 20020,
        "value": "value1"
    }
}

不过,etcd的watch的历史数据最多1000条,即etcd只会保存1000个index的历史event:

func newStore() *store {
	s := new(store)
	s.CurrentVersion = defaultVersion
	s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
	s.Stats = newStats()
	s.WatcherHub = newWatchHub(1000) ///1000 history event
	s.ttlKeyHeap = newTtlKeyHeap()
	return s
}

TTL

  • ttlKeyHeap

对于设置有TTL的Key/Value,etcd内部会用一个heap来记录相应的node:

/// implement raft.StateMachine
type store struct {
	Root           *node ///root node
	WatcherHub     *watcherHub
	CurrentIndex   uint64
	Stats          *Stats
	CurrentVersion int
	ttlKeyHeap     *ttlKeyHeap  // need to recovery manually
	worldLock      sync.RWMutex // stop the world lock
}

func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
	expireTime time.Time, action string) (*Event, error) {
...
	// node with TTL
	if !n.IsPermanent() {
		s.ttlKeyHeap.push(n)

		eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
	}

	s.CurrentIndex = nextIndex
}
  • SyncCommand

leader每隔500ms就会执行一次SyncCommand命令,删除过期的node:

func (s *PeerServer) monitorSync() {
	ticker := time.NewTicker(time.Millisecond * 500)
	defer ticker.Stop()
	for {
		select {
		case <-s.closeChan:
			return
		case now := <-ticker.C:
			if s.raftServer.State() == raft.Leader {
				s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now)) ///执行SyncCommand命令
			}
		}
	}
}

func (c SyncCommand) Apply(context raft.Context) (interface{}, error) {
	s, _ := context.Server().StateMachine().(store.Store)
	s.DeleteExpiredKeys(c.Time)

	return nil, nil
}

// deleteExpiredKyes will delete all
func (s *store) DeleteExpiredKeys(cutoff time.Time) {
	s.worldLock.Lock()
	defer s.worldLock.Unlock()

	for {
		node := s.ttlKeyHeap.top()
		if node == nil || node.ExpireTime.After(cutoff) { ///node过期
			break
		}
...