万字长文解析 etcd 如何实现 watch 机制?
ztj100 2024-10-28 21:10 32 浏览 0 评论
你好,我是 aoho,今天我和你分享的主题是 etcd watch:etcd 如何实现 watch 机制?
etcd v2 和 v3 版本之间的重要变化之一就是 watch 机制的优化。etcd v2 watch 机制采用的是基于 HTTP/1.x 协议的客户端轮询机制,历史版本存储则是通过滑动窗口。在大量的客户端连接的场景或者集群规模较大的场景,导致 etcd 服务端的扩展性和稳定性都无法保证。etcd v3 在此基础上进行优化,满足了 Kubernetes pods 部署和状态管理等业务场景诉求。
watch 是监听一个或一组 key,key 的任何变化都会发出消息。某种意义上讲,etcd 就是发布订阅模式。
Watch 的用法
在具体将讲解 Watch 的实现方式之前,我们先来体验下如何使用 Watch。通过 etcdctl 命令行工具实现键值对的检测:
$ etcdctl put hello aoho
$ etcdctl put hello boho
$ etcdctl watch hello -w=json --rev=1
{
"Header": {
"cluster_id": 14841639068965178418,
"member_id": 10276657743932975437,
"revision": 4,
"raft_term": 4
},
"Events": [{
"kv": {
"key": "aGVsbG8=",
"create_revision": 3,
"mod_revision": 3,
"version": 1,
"value": "YW9obw=="
}
}, {
"kv": {
"key": "aGVsbG8=",
"create_revision": 3,
"mod_revision": 4,
"version": 2,
"value": "Ym9obw=="
}
}],
"CompactRevision": 0,
"Canceled": false,
"Created": false
}
依次在命令行中输入上面三条命令,前面两条依次更新 hello 对应的值,第三条命令监测键为 hello 的变化,并指定版本号从 1 开始。结果输出了两条 watch 事件。我们接着在另一个命令行继续输入如下的更新命令:
$ etcdctl put hello coho
可以看到前一个命令行输出了如下的内容:
{
"Header": {
"cluster_id": 14841639068965178418,
"member_id": 10276657743932975437,
"revision": 5,
"raft_term": 4
},
"Events": [{
"kv": {
"key": "aGVsbG8=",
"create_revision": 3,
"mod_revision": 5,
"version": 3,
"value": "Y29obw=="
}
}],
"CompactRevision": 0,
"Canceled": false,
"Created": false
}
命令行输出的事件表明,键 hello 对应的键值对发生了更新,并输出了事件的详细信息。如上就是通过 etcdctl 客户端工具实现 watch 指定的键值对功能。接着我们看下,clientv3 中是如何实现 watch 功能。
func testWatch() {
s := newWatchableStore()
w := s.NewWatchStream()
w.Watch(start_key: foo, end_key: nil)
w.Watch(start_key: bar, end_key: nil)
for {
consume := <- w.Chan()
}
}
etcd 的 mvcc 模块对外提供了两种访问键值对的实现,一种是键值存储 kvstore,另一种是 watchableStore。它们都实现了 KV 接口,KV 接口的具体实现则是 store 结构体。在上面的实现中,我们先调用了 watchableStore。
当我们要使用 Watch 功能时,我们创建了一个 watchStream。创建出来的 w 可以监听的键为 hello,之后我们就可以消费 w.Chan() 返回的 channel。键为 hello 的任何变化,都会通过这个 channel 发送给客户端。
可以看到 watchStream 实现了在大量 kv 的变化中,过滤出当前所监听的 key,将 key 的变化输出。
watchableStore 存储
在前面的课时已经介绍过 kvstore,这里我们介绍 watchableStore 的实现。Watch 的实现是在 store 上封装了一层叫做 watchableStore,重写了 store 的 Write 方法。
// 位于 mvcc/watchable_store_txn.go:22
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
// end write txn under watchable store lock so the updates are visible
// when asynchronous event posting checks the current store revision
tw.s.mu.Lock()
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
type watchableStoreTxnWrite struct {
TxnWrite
s *watchableStore
}
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
return &watchableStoreTxnWrite{s.store.Write(trace), s}
}
通过 MVCC 中介绍,store 的任何写操作,都需要 Write 方法返回的 TxnWrite。所以这里重写 Write 方法意味着任何写操作都会经过 watchableStore。从上面的代码不难看出,watchableStoreTxnWrite 在事务提交时,先将本次变更 changes 打包成 Event,然后调用 notify 来将变更通知出去。最后真正提交事务 TxnWrite.End()。
Watch 负责了注册、管理以及触发 Watcher 的功能。我们先来看一下这个结构体的各个字段:
// 位于 mvcc/watchable_store.go:47
type watchableStore struct {
*store
// 同步读写锁
mu sync.RWMutex
// 被阻塞在 watch channel 中的 watcherBatch
victims []watcherBatch
victimc chan struct{}
// 未同步的 watchers
unsynced watcherGroup
// 已同步的 watchers
synced watcherGroup
stopc chan struct{}
wg sync.WaitGroup
}
每一个 watchableStore 其实都组合了来自 store 结构体的字段和方法,除此之外,还有两个 watcherGroup 类型的字段,watcherGroup 管理多个 watcher,能够根据 key 快速找到监听该 key 的一个或多个 watcher。其中 unsynced 用于存储未同步完成的实例,synced 用于存储已经同步完成的实例。
根据 watchableStore 的定义,我们可以描述 Watch 监听的过程。
watchableStore 收到了所有 key 的变更后,将这些 key 交给 synced(watchGroup),synced 能够快速地从所有 key 中找到监听的 key。将这些 key 发送给对应的 watcher,这些 watcher 再通过 chan 将变更信息发送出去。
synced 是怎么快速找到符合条件的 key 呢?etcd 中使用了 map 和 adt(红黑树)来实现。
不单独使用 map 是因为 watch 可以监听一个范围的 key。如果只监听一个 key:
watch(start_key: foo, end_key: nil)
则对应的存储为 map[key]*watcher。这样可以根据 key 快速找到对应的 watcher,etcd 也是这样做的。但对于一组 key 呢?
watch(start_key: foo, end_key: fop)
这里我监听了从 foo->fop 之间的所有 key,理论上这些 key 的数目是无限的,所以无法再使用 map。比如:key=fooac 也属于监听范围。etcd 用 adt 来存储这种 key。
// 位于 mvcc/watcher_group.go:147
// watcherGroup 是由一系列范围 watcher 组织起来的 watchers
type watcherGroup struct {
// keyWatchers has the watchers that watch on a single key
keyWatchers watcherSetByKey
// ranges has the watchers that watch a range; it is sorted by interval
ranges adt.IntervalTree
// watchers is the set of all watchers
watchers watcherSet
}
adt 的实现这里不做介绍,只用知道 adt 能够根据 key=fooac 快速地找到所属范围 foo->fop。在找到 watcher 后,调用 watcher 的 send() 方法,将变更的 Event 发送出去。
syncWatchers 同步监听
在初始化一个新的 watchableStore 时,etcd 会创建一个用于同步 watcherGroup 的 Goroutine,在 syncWatchersLoop 这个循环中会每隔 100ms 调用一次 syncWatchers 方法,将所有未通知的事件通知给所有的监听者,这可以说是整个模块的核心:
// 位于 mvcc/watchable_store.go:334
func (s *watchableStore) syncWatchers() int {
s.mu.Lock()
defer s.mu.Unlock()
if s.unsynced.size() == 0 {
return 0
}
s.store.revMu.RLock()
defer s.store.revMu.RUnlock()
// in order to find key-value pairs from unsynced watchers, we need to
// find min revision index, and these revisions can be used to
// query the backend store of key-value pairs
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
var evs []mvccpb.Event
evs = kvsToEvents(s.store.lg, wg, revs, vs)
tx.RUnlock()
var victims watcherBatch
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
w.minRev = curRev + 1
eb, ok := wb[w]
if !ok {
// bring un-notified watcher to synced
s.synced.add(w)
s.unsynced.delete(w)
continue
}
if eb.moreRev != 0 {
w.minRev = eb.moreRev
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
if victims == nil {
victims = make(watcherBatch)
}
w.victim = true
}
if w.victim {
victims[w] = eb
} else {
if eb.moreRev != 0 {
// stay unsynced; more to read
continue
}
s.synced.add(w)
}
s.unsynced.delete(w)
}
s.addVictim(victims)
vsz := 0
for _, v := range s.victims {
vsz += len(v)
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
return s.unsynced.size()
}
简化后的 syncWatchers 方法中总共做了三件事情,首先是根据当前的版本从未同步的 watcherGroup 中选出一些待处理的任务,然后从 BoltDB 中取当前版本范围内的数据变更并将它们转换成事件,事件和 watcherGroup 在打包之后会通过 send 方法发送到每一个 watcher 对应的 Channel 中。
客户端监听事件
客户端监听键值对时,调用的正是 Watch 方法,Watch 在 stream 中创建一个新的 watcher,并返回对应的 WatchID。
// 位于 mvcc/watcher.go:108
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// 防止出现 ket>= end 的错误范围情况
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1, ErrEmptyWatcherRange
}
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1, ErrEmptyWatcherRange
}
if id == AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w
return id, nil
}
AutoWatchID 是 WatchStream 中传递的观察者 ID。当用户没有提供可用的 ID 时,如果有传递该值,etcd 将自动分配一个 ID。如果传递的 ID 已经存在,则会返回 ErrWatcherDuplicateID 错误。watchable_store.go 中的 watch 实现是监听的具体实现,实现代码如下:
// 位于 mvcc/watchable_store.go:120
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
// 构建 watcher
wa := &watcher{
key: key,
end: end,
minRev: startRev,
id: id,
ch: ch,
fcs: fcs,
}
s.mu.Lock()
s.revMu.RLock()
synced := startRev > s.store.currentRev || startRev == 0
if synced {
wa.minRev = s.store.currentRev + 1
if startRev > wa.minRev {
wa.minRev = startRev
}
}
if synced {
s.synced.add(wa)
} else {
slowWatcherGauge.Inc()
s.unsynced.add(wa)
}
s.revMu.RUnlock()
s.mu.Unlock()
// prometheus 的指标增加
watcherGauge.Inc()
return wa, func() { s.cancelWatcher(wa) }
}
对 watchableStore 进行操作之前,需要加锁。当 etcd 收到客户端的 watch 请求,如果请求携带了 revision 参数,则比较请求的 revision 和 store 当前的 revision,如果大于当前 revision,则放入 synced 组中,否则放入 unsynced 组。
服务端处理监听
当 etcd 服务启动时,会在服务端运行一个用于处理监听事件的 watchServer gRPC 服务,客户端的 Watch 请求最终都会被转发到这个服务的 Watch 函数中:
// 位于 etcdserver/api/v3rpc/watch.go:140
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
lg: ws.lg,
clusterID: ws.clusterID,
memberID: ws.memberID,
maxRequestBytes: ws.maxRequestBytes,
sg: ws.sg,
watchable: ws.watchable,
ag: ws.ag,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}
sws.wg.Add(1)
go func() {
sws.sendLoop()
sws.wg.Done()
}()
errc := make(chan error, 1)
// Ideally recvLoop would also use sws.wg to signal its completion
// but when stream.Context().Done() is closed, the stream's recv
// may continue to block since it uses a different context, leading to
// deadlock when calling sws.close().
go func() {
if rerr := sws.recvLoop(); rerr != nil {
if isClientCtxErr(stream.Context().Err(), rerr) {
sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
} else {
sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
streamFailures.WithLabelValues("receive", "watch").Inc()
}
errc <- rerr
}
}()
select {
case err = <-errc:
close(sws.ctrlStream)
case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
}
}
sws.close()
return err
}
当客户端想要通过 Watch 结果监听某一个 Key 或者一个范围的变动,在每一次客户端调用服务端上述方式都会创建两个 Goroutine,其中一个协程会负责向监听者发送数据变动的事件,另一个协程会负责处理客户端发来的事件。
服务端 recvLoop
recvLoop 协程主要用来负责处理客户端发来的事件。
// 位于 etcdserver/api/v3rpc/watch.go:216
func (sws *serverWatchStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
if uv.CreateRequest == nil {
break
}
creq := uv.CreateRequest
if len(creq.Key) == 0 {
// \x00 is the smallest key
creq.Key = []byte{0}
}
if len(creq.RangeEnd) == 0 {
// force nil since watchstream.Watch distinguishes
// between nil and []byte{} for single key / >=
creq.RangeEnd = nil
}
if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
// support >= key queries
creq.RangeEnd = []byte{}
}
if !sws.isWatchPermitted(creq) {
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: creq.WatchId,
Canceled: true,
Created: true,
CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
}
select {
case sws.ctrlStream <- wr:
case <-sws.closec:
}
return nil
}
filters := FiltersFromRequest(creq)
wsrev := sws.watchStream.Rev()
rev := creq.StartRevision
if rev == 0 {
rev = wsrev + 1
}
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
if err == nil {
sws.mu.Lock()
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.PrevKv {
sws.prevKV[id] = true
}
if creq.Fragment {
sws.fragment[id] = true
}
sws.mu.Unlock()
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: err != nil,
}
if err != nil {
wr.CancelReason = err.Error()
}
select {
case sws.ctrlStream <- wr:
case <-sws.closec:
return nil
}
case *pb.WatchRequest_CancelRequest:
if uv.CancelRequest != nil {
id := uv.CancelRequest.WatchId
err := sws.watchStream.Cancel(mvcc.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: id,
Canceled: true,
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.prevKV, mvcc.WatchID(id))
delete(sws.fragment, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
}
}
default:
// we probably should not shutdown the entire stream when
// receive an valid command.
// so just do nothing instead.
continue
}
}
}
在用于处理客户端的 recvLoop 方法中调用了 mvcc 模块暴露出的 watchStream.Watch 方法,该方法会返回一个可以用于取消监听事件的 watchID;当 gRPC 流已经结束后者出现错误时,当前的循环就会返回,两个 Goroutine 也都会结束。
服务端 sendLoop
如果出现了更新或者删除事件,就会被发送到 watchStream 持有的 Channel 中,而 sendLoop 会通过 select 来监听多个 Channel 中的数据并将接收到的数据封装成 pb.WatchResponse 结构并通过 gRPC 流发送给客户端:
// 位于 etcdserver/api/v3rpc/watch.go:332
func (sws *serverWatchStream) sendLoop() {
// watch ids that are currently active
for {
select {
case wresp, ok := <-sws.watchStream.Chan():
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
for i := range evs {
events[i] = &evs[i] }
canceled := wresp.CompactRevision != 0
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wresp.Revision),
WatchId: int64(wresp.WatchID),
Events: events,
CompactRevision: wresp.CompactRevision,
Canceled: canceled,
}
sws.gRPCStream.Send(wr)
case c, ok := <-sws.ctrlStream: // ...
case <-progressTicker.C: // ...
case <-sws.closec:
return
}
}
}
对于每一个 Watch 请求来说,watchServer 会根据请求创建两个用于处理当前请求的 Goroutine,这两个协程会与更底层的 mvcc 模块协作提供监听和回调功能:
到这里,我们对于 Watch 功能的介绍就差不多结束了,从对外提供的接口到底层的使用的数据结构以及具体实现,其他与 Watch 功能相关的话题可以直接阅读 etcd 的源代码了解更加细节的实现。
Watch 异常场景
上述是正常流程,但是会有很多不正常的情况发生。可以知道,消息都是通过一个 Chan 发送出去,但如果消费者消费速度慢,Chan 就容易堆积。Chan 的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?
接下来就要讨论前面小结所提及的 unsynced、victims 数组的作用。首先思考下 Chan 什么时候会满呢?
var (
// chanBufLen is the length of the buffered chan
// for sending out watched events.
// TODO: find a good buf value. 1024 is just a random one that
// seems to be reasonable.
chanBufLen = 1024
// maxWatchersPerSync is the number of watchers to sync in a single batch
maxWatchersPerSync = 512
)
代码中 Chan 的长度是 1024。不过这也是一个随机值,只是没有现在更好的选择。
chan 一旦满了,会发生以下操作:
// 位于 mvcc/watchable_store.go:438
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
w.victim = true
victim[w] = eb
s.synced.delete(w)
slowWatcherGauge.Inc()
}
}
s.addVictim(victim)
}
notify 通知一个事实,即在给定修订版中的给定事件只是发生在监视事件键的观察者身上。watcher 会记录当前的 Revision,并将自身标记为受损的。此次的变更操作会被保存到 watchableStore 的 victims 中。同时该 watcher 会被从 synced 踢出。
假设此时有一个写操作:foo=f1。而正好 Chan 此时刚满,则监听 foo 的 watcher 将从 synced 中踢出,同时 foo=f1 被保存到 victims 中。
接下来对 foo 的任何变更,该 watcher 都不会记录。那这些消息就都丢掉了吗?当然不是,watcher 变成受损状态时记录下了当时的 Revision,这个很重要。
syncVictimsLoop 清除 victims
在上面的场景中,我们知道,队列满时,当时变更的 Event 被放入了 victims 中。这个协程就会试图清除这个 Event。怎么清除呢?协程会不断尝试让 watcher 发送这个 Event,一旦队列不满,watcher 将这个 Event 发出后。该 watcher 就被划入了 unsycned 中,同时不再是受损状态。
// 位于 mvcc/watchable_store.go:246
// syncVictimsLoop tries to write precomputed watcher responses to
// watchers that had a blocked watcher channel
func (s *watchableStore) syncVictimsLoop() {
defer s.wg.Done()
for {
for s.moveVictims() != 0 {
// try to update all victim watchers
}
s.mu.RLock()
isEmpty := len(s.victims) == 0
s.mu.RUnlock()
var tickc <-chan time.Time
if !isEmpty {
tickc = time.After(10 * time.Millisecond)
}
select {
case <-tickc:
case <-s.victimc:
case <-s.stopc:
return
}
}
}
// moveVictims tries to update watches with already pending event data
func (s *watchableStore) moveVictims() (moved int) {
s.mu.Lock()
victims := s.victims
s.victims = nil
s.mu.Unlock()
var newVictim watcherBatch
for _, wb := range victims {
// try to send responses again
for w, eb := range wb {
// watcher has observed the store up to, but not including, w.minRev
rev := w.minRev - 1
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
if newVictim == nil {
newVictim = make(watcherBatch)
}
newVictim[w] = eb
continue
}
moved++
}
// assign completed victim watchers to unsync/sync
s.mu.Lock()
s.store.revMu.RLock()
curRev := s.store.currentRev
for w, eb := range wb {
if newVictim != nil && newVictim[w] != nil {
// couldn't send watch response; stays victim
continue
}
w.victim = false
if eb.moreRev != 0 {
w.minRev = eb.moreRev
}
if w.minRev <= curRev {
s.unsynced.add(w)
} else {
slowWatcherGauge.Dec()
s.synced.add(w)
}
}
s.store.revMu.RUnlock()
s.mu.Unlock()
}
if len(newVictim) > 0 {
s.mu.Lock()
s.victims = append(s.victims, newVictim)
s.mu.Unlock()
}
return moved
}
此时 syncWatchersLoop 协程就开始起作用。由于在受损状态下,这个 watcher 已经错过了很多消息。为了追回进度,协程会根据 watcher 保存的 Revision,找出受损之后所有的消息,将关于 foo 的消息全部给 watcher,当 watcher 将这些消息都发送出去后。watcher 就脱离了 unsynced,成为了 synced。
至此就解决了 Chan 满导致的问题。同时也阐明了 Watch 的设计实现。
阅读最新文章,关注我的公众号:aoho求索
相关推荐
- Jquery 详细用法
-
1、jQuery介绍(1)jQuery是什么?是一个js框架,其主要思想是利用jQuery提供的选择器查找要操作的节点,然后将找到的节点封装成一个jQuery对象。封装成jQuery对象的目的有...
- 前端开发79条知识点汇总
-
1.css禁用鼠标事件2.get/post的理解和他们之间的区别http超文本传输协议(HTTP)的设计目的是保证客户机与服务器之间的通信。HTTP的工作方式是客户机与服务器之间的请求-应答协议。...
- js基础面试题92-130道题目
-
92.说说你对作用域链的理解参考答案:作用域链的作用是保证执行环境里有权访问的变量和函数是有序的,作用域链的变量只能向上访问,变量访问到window对象即被终止,作用域链向下访问变量是不被允许的。...
- Web前端必备基础知识点,百万网友:牛逼
-
1、Web中的常见攻击方式1.SQL注入------常见的安全性问题。解决方案:前端页面需要校验用户的输入数据(限制用户输入的类型、范围、格式、长度),不能只靠后端去校验用户数据。一来可以提高后端处理...
- 事件——《JS高级程序设计》
-
一、事件流1.事件流描述的是从页面中接收事件的顺序2.事件冒泡(eventbubble):事件从开始时由最具体的元素(就是嵌套最深的那个节点)开始,逐级向上传播到较为不具体的节点(就是Docu...
- 前端开发中79条不可忽视的知识点汇总
-
过往一些不足的地方,通过博客,好好总结一下。1.css禁用鼠标事件...
- Chrome 开发工具之Network
-
经常会听到比如"为什么我的js代码没执行啊?","我明明发送了请求,为什么反应?","我这个网站怎么加载的这么慢?"这类的问题,那么问题既然存在,就需要去解决它,需要解决它,首先我们得找对导致问题的原...
- 轻量级 React.js 虚拟美化滚动条组件RScroll
-
前几天有给大家分享一个Vue自定义滚动条组件VScroll。今天再分享一个最新开发的ReactPC端模拟滚动条组件RScroll。...
- 一文解读JavaScript事件对象和表单对象
-
前言相信做网站对JavaScript再熟悉不过了,它是一门脚本语言,不同于Python的是,它是一门浏览器脚本语言,而Python则是服务器脚本语言,我们不光要会Python,还要会JavaScrip...
- Python函数参数黑科技:*args与**kwargs深度解析
-
90%的Python程序员不知道,可变参数设计竟能决定函数的灵活性和扩展性!掌握这些技巧,让你的函数适应任何场景!一、函数参数设计的三大进阶技巧...
- 深入理解Python3密码学:详解PyCrypto库加密、解密与数字签名
-
在现代计算领域,信息安全逐渐成为焦点话题。密码学,作为信息保护的关键技术之一,允许我们加密(保密)和解密(解密)数据。...
- 阿里Nacos惊爆安全漏洞,火速升级!(附修复建议)
-
前言好,我是threedr3am,我发现nacos最新版本1.4.1对于User-Agent绕过安全漏洞的serverIdentitykey-value修复机制,依然存在绕过问题,在nacos开启了...
- Python模块:zoneinfo时区支持详解
-
一、知识导图二、知识讲解(一)zoneinfo模块概述...
- Golang开发的一些注意事项(一)
-
1.channel关闭后读的问题当channel关闭之后再去读取它,虽然不会引发panic,但会直接得到零值,而且ok的值为false。packagemainimport"...
- Python鼠标与键盘自动化指南:从入门到进阶——键盘篇
-
`pynput`是一个用于控制和监控鼠标和键盘的Python库...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)