原创 Delighted 2025-03-02 09:02 重庆
点击关注公众号,“技术干货” 及时达!
摘要
CRDT(无冲突复制数据类型)是一种能在网络多台计算机上复制的数据结构,副本可独立并行更新且无冲突,最终能达成一致。CRDT 可在多用户同时操作同一页面时保证一致性,支持离线使用和分区容错,能很好地权衡 CAP 定理中的一致性、可用性和分区容错性。
尽管 CRDT 提供了理论上的基础,但具体的实现和应用并不是简单的任务。使用第三方库可以大大简化应用 CRDT 的过程,确保实现的正确性和效率。本文将在初始CRDT的基础上继续来深入了解开发过程中实现协同的第三方库Yjs,围绕Yjs的底层结构、解决冲突、删除机制、Undo/Redo以及网络几部分进行讲解,再了解完底层原理后可以尝试使用react+Yjs实现简单的协同编辑demo。
Yjs 结合了 「CRDT 的自动合并能力」、「去中心化架构」和「高效增量同步」,解决了传统 OT 方案对中心化服务器的依赖和冲突处理的复杂性,同时提供了强大的离线支持和扩展性,完美契合现代分布式协作应用的需求,因此成为主流选择。
❝传统 OT 方案是在服务器收集多个用户的操作后,进行排序处理,然后按照这个顺序将操作再同步给其它用户,而Yjs是将其他用户操作传递给本地,在客户端进行冲突处理,也无需注重传递的先后顺序。
❞
Yjs是一种高性能的基于操作的CRDT( operation-based CRDT),常用于构建自动同步的协作应用程序。它将其内部 CRDT 模型公开为可并发操作的共享数据类型。共享数据类型通常有类似于Map或者常见数据类型Array等(即YMap、YArray、YText等,统称为YModel),在这些数据类型发生改变时,会触发事件,并自动合并且不会发生合并冲突,来实现「最终一致性」。
Yjs的建模数据结构
Yjs 在工程上建模 CRDT 时所用的基础数据结构:「双向链表」。
在 Yjs 中,不论是 YText、YArray 还是 YMap,这些 YModel 实例中的数据都存储在一条双向链表里。粗略地说,「这条链表中的每一项(或者说每个 Item)都唯一地记录了某次用户操作所修改的数据」,某种程度上和区块链有些异曲同工。可以认为对 YModel 的操作,最后都会转为对这条链表的 append、insert、split、merge 等结构变换。链表中每个 item 会被序列化编码后分发,而基于 CRDT 算法的保证,「只要每个客户端最终都能接收到全部的 item,那么不论客户端以何种顺序接收到这些 Item,它们都能重建出完全一致的文档状态(强最终一致性)」 。
「Item的核心数据结构:」
class Item {
constructor(id, origin, rightOrigin, parent, parentSub, content) {
this.id = id; // 唯一标识符 { client, clock }
this.origin = origin; // 当前操作的左侧位置引用
this.rightOrigin = rightOrigin; // 当前操作的右侧位置引用
this.parent = parent; // 父节点引用
this.parentSub = parentSub; // 子引用
this.content = content; // 实际内容
this.left = null; // 链表中左侧节点
this.right = null; // 链表中右侧节点
}
}
「ID(clientID,clock)」
在多人协作的过程中,Yjs会为每个item分配一个唯一ID来确定其在逻辑时间轴的位置上。
class ID {
constructor (client: number, clock: number) {
// 客户端的唯一 ID
this.client = client
// 逻辑时间戳
this.clock = clock
}
}
更新逻辑为:
发生本地事件时,localClock += 1
。
在接收到远程事件时,localClock = max(remoteClock, localClock) + 1
。
这个逻辑相对于上文《初始CRDT》提到的check_state
,确保了任意两个item之间在逻辑上的先后关系。
「content」
content 指 Item 所存储的数据(比如对应插入的字符串),存储了当前 Item 对应的基础类型或者复合类型;
「position」
left,right(Yjs 的 CRDT 数据结构基于双向链表,依赖于此);origin,originRight,这两项属性是为了解决冲突;parent,parentSub,这两项是为了支持 Yjs 的灵活的嵌套数据结构;
「parent」 属性一般用于标识某个 Item 所属的父级对象。
「parentSub」 属性用于标识当前 Item 在其父对象中的键(key)。通常用于YMap结构中,对于YArray、YText等结构,他们均可通过下标来获取准确位置。
// 创建一个新的 Yjs 文档
const doc = new Y.Doc();
// 创建一个 YMap
const yMap = doc.getMap('exampleMap');
// 插入元素到 YMap 中
const yText = new Y.Text('Hello, world!');
yMap.set('greeting', yText);
console.log(yText.parent === yMap); // 输出: true
console.log(yText.parentSub); // 输出: 'greeting'
以将itemB插入至itemA和itemC为例:
item是如何存储的呢?
由于任意 item 均可用 ID 来排序,因此一种选择是使用 B 树这样具备对数级优良插入和删除时间复杂度的数据结构,将所有的 item 维护在一棵平衡二叉树中。但实践中 Yjs 选择了一种更简单直接的方案,「即为每个 client 分配一个扁平的 item 数组」,相应的结构被称为「StructStore」:
doc.store.clients: {
1111111: [Item, Item, Item], // ...
2222222: [Item, Item, Item] // ...
}
每个item数组都是按clock字段的值来排序的,这样只需要做一次二分查找,就能快速找到某个ID对应的item。而在接收到远程 item 时,Yjs 除了将 item 插入链表,也会将它插入 StructStore 中的相应位置。
因此我们可以认为,Yjs 中存在两种索引数据的方式:
依据文档结构顺序(即 document order)建模的「双向链表」。
依据逻辑时序(即 insertion order)建模的 「StructStore」。
解决并发冲突(Yata算法)
「什么是冲突呢?」
冲突(Conflict)是指当多个用户同时对同一文档的同一位置进行独立操作时,这些操作因顺序或逻辑矛盾导致文档状态无法直接合并的现象。
「冲突的核心原因」
「并发性」:操作在不同客户端独立产生,网络延迟导致操作到达服务器的顺序不确定。
「位置重叠」:多个操作作用于相同逻辑位置(如插入到同一字符后)。
「缺乏全局时序」:没有统一的时钟或顺序保证所有客户端对操作的理解一致。
「YATA 算法」背后的核心思想是通过维护操作的顺序和冲突解决机制,确保所有并发修改能够正确合并,并保持文档在所有副本之间的一致性。当存在新Item 插入、现有Item 更新或删除时执行integrate(transaction,offset)
方法,使得Yjs 能够处理不同客户端的并发操作,并确保最终文档状态一致。
「Y A T A !」 这几个字符每个都对应一个 item(或者说一次字符插入的 operation)。它们通过 left 和 right 字段连接在一起。在插入新字符 T
的时候,Yjs 就会「根据 item 的 ID(逻辑时间戳) 在链表中查找合适的插入位置,将新字符对应的 item 接入链表中。」 接下来根据源码来看integrate(transaction,offset)
是如何实现?
tips:「Transaction(事物)」 在 Yjs 中是用于管理和封装一系列对文档的更改操作,一个Transaction关联一个Y.doc,采用structStore存储其对应的操作历史、状态同步。
更新时钟和分割内容(处理偏移量)
if (offset > 0) {
this.id.clock += offset;
this.left = getItemCleanEnd(transaction, transaction.doc.store, createID(this.id.client, this.id.clock - 1));
this.origin = this.left.lastId;
this.content = this.content.splice(offset);
this.length -= offset;
}
offset偏移量,用于准确确定用户插入或删除的位置。当存在偏移量的时候,会根据偏移量更新clock逻辑时间戳,然后找到新位置下的文档结构的左侧item节点并更新origin为它的ID,最后将内容分割(分割内容为原内容从 offset
位置到结束位置)。
处理冲突(parent存在)
判断冲突
<!---->
if ((!this.left && (!this.right || this.right.left !== null)) || (this.left && this.left.right !== this.right))
该条件用于检查当前 Item 对象(即 this)的位置是否存在不一致或潜在的冲突。
「情况1:当前」 「item」 「没有左邻接,存在右邻接」
假设初始链表为B -> C
,用户1在开头插入A,但由于用户1网络延迟,此时用户2已经在开头插入了D。
最终链表结构:
D -> B -> C
/
A
产生的可能原因:头部插入延迟、同步丢失或被覆盖、删除和插入操作并发。
「情况2:当前」 「Item」 「没有左右邻接」 「Item」
在一个初始为空的数据结构中插入第一个元素。
产生的可能原因:插入在空文档或者是对于YMap的操作。
「情况3:当前」 「Item」 「有左邻接」 「Item」 「,但右连接不一致」
假设初始链表为A -> B -> C
,用户1和用户2几乎同时在B和C之间分别插入D和E。
A -> B -> D -> C A -> B -> E -> C
// 此时链表连接关系是:// 此时链表连接关系是:
B.right = D B.right = E
D.left = B E.left = B
D.right = C E.right = C
C.left = D C.left = E
并发冲突处理
A -> B -> D -> E -> C
// 合并后的链表连接关系是:
B.right = D
D.left = B
D.right = E
E.left = D
E.right = C
C.left = E
对于Item(E)来说this.left.right = D
this.right = C
,造成冲突。
产生的可能原因:不同用户同时在相同位置插入的冲突。
冲突与位置检测
<!---->
let left = this.left
let o
if (left !== null) {
o = left.right
} else if (this.parentSub !== null) {
o =(this.parent)._map.get(this.parentSub) || null
while (o !== null && o.left !== null) {
o = o.left
}
} else {
o =(this.parent)._start
}
const conflictingItems = new Set()
const itemsBeforeOrigin = new Set()
left被初始化为当前 Item 的左邻接。
「o」 是一个中间变量,用于查找并记录寻找潜在冲突的Item。
「conflictingItems:」 用于记录与 this 存在冲突的 Item。
「itemsBeforeOrigin:」 用于记录追踪遍历过程中所有遇到的 Item。
while (o !== null && o !== this.right) {
itemsBeforeOrigin.add(o)
conflictingItems.add(o)
if (compareIDs(this.origin, o.origin)) {
if (o.id.client < this.id.client) {
left = o
conflictingItems.clear()
} else if (compareIDs(this.rightOrigin, o.rightOrigin)) {
break
}
} else if (o.origin !== null && itemsBeforeOrigin.has(getItem(transaction.doc.store, o.origin))) {
if (!conflictingItems.has(getItem(transaction.doc.store, o.origin))) {
left = o
conflictingItems.clear()
}
} else {
break
}
o = o.right
}
this.left = left
while 循环遍历文档从 o 开始到 this.right 位置的「所有项」,其中 o 最初被设置为 left.right(即 left 后的一个 Item)。目的是找到正确的插入位置并解决可能的冲突。
将正在观察的o添加到conflictingItems、itemsBeforeOrigin,再根据origin是否相同进一步比较ID来决定位置。
冲突检测
如果 「o.id.client 的值小于 this.id.client」,则 o 更早插入,因此更新 left 为 o并清空 conflictingItems 集合。
如果 o 和 this 指向同一右边界 rightOrigin,则根据 ID 决定位置后退出循环。
检测其他潜在冲突
如果 o.origin 非空且 itemsBeforeOrigin 包含该 origin,检查 conflictingItems是否不包含 o.origin。如果不包含,则更新 left,清空 conflictingItems。
循环结束后,更新this.left。
以下情况对应上文三种情况的案例:
「情况1:当前」 「Item」 「没有左邻接,存在右邻接」
进行插入A时
首先,由于left 为null,且不为YMap结构,确定o 的值为(this.parent)._start
,确定D与A存在冲突。
进入while循环,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为null,假设用户1的clientId小于用户2,且compareIDs(this.rightOrigin, o.rightOrigin)
为true,跳出循环,left为null,即确定了插入位置。
「情况2:当前Item没有左右邻接」
向初始为空的数据结构中插入不存在位置冲突。
「情况3:当前」 「Item」 「有左邻接」 「Item」 「,但右连接不一致」
用户2插入E时
left为B,由于用户1稍快一点插入,o=left.right=D ,确定D和E存在冲突。
进入while循环,从D开始,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为B,假设用户1的clientId小于用户2,则更新left为D,o 为C,此时不满足o !== this.right
不再进行循环,最终确定left为D。
「情况4(特殊情况):采用YMap结构时,并发插入」
由于YMap主要以键值对的方式存储,YMap 的操作依赖于键来确定其内部数据的存储和查找方式,而不依赖于链表的顺序,所以不存在偏移量的概念,在 YMap 中插入新的键值对时,不会默认设置 left 和 right 指针,并且也没有origin和rightorigin 的概念。
假设用户1和用户2并发插入,并且用户1的clientId小于用户2。
const ydoc = new Y.Doc()
const ymap = ydoc.getMap('Map')
//用户1
ymap.set('key1', 'value1');
//用户2
ymap.set('key1', 'value2');
当用户2插入时,由于存在parentSub,所以o 被确认为value1 ,即value1 与value2 存在冲突。
进入while循环,从value1开始,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为null,且满足o.id.client < this.id.client
,则left 为value1,随后o 取下一个值为null,跳出循环。
链表结构更新
if (this.left !== null) {
const right = this.left.right;
this.right = right;
this.left.right = this;
} else {
let r;
if (this.parentSub !== null) {
r =(this.parent)._map.get(this.parentSub) || null;
while (r !== null && r.left !== null) {
r = r.left;
}
} else {
r =(this.parent)._start;
(this.parent)._start = this;
}
this.right = r;
}
if (this.right !== null) {
this.right.left = this;
} else if (this.parentSub !== null) {
(this.parent)._map.set(this.parentSub,this);
if (this.left !== null) {
this.left.delete(transaction);
}
}
if (this.parentSub === null && this.countable && !this.deleted) {
(this.parent)._length += this.length;
}
if (
((this.parent)._item !== null &&(this.parent)._item.deleted) ||
(this.parentSub !== null && this.right !== null)
) {
this.delete(transaction);
}
如果 this.left 不为空,则将当前项 (this) 插入到文档中,通过调整 left 和 right 的指针来达成。
如果 left 为空(即当前项可能是列表的新起始项),则需要查找合适的 right 项,并更新 parent 的起始项。
如果 this.right 不为空,即当前 Item 有一个在其后面需要直接来连的项,则设置那个后续项的 left 属性指向当前 Item。这样确保链表从左到右和从右到左的链接都是完整的。
如果Item 没有右邻接项(this.right 为 null),同时 this 位于YMap结构中(有 parentSub)的情况。更新当前 parentSub,如果存在left将其删除。(类似于其中一个用户试图删除一个元素,而另一个用户几乎同时在相同的位置尝试添加或更新一个元素。)
调整父节点的长度parent,反映新增加的 Item 的长度。
「this.parentSub === null」:检查当前 Item 是否没有子项标识符 (parentSub)。这通常意味着当前 Item 不是存储在父 Item 的映射 (_map) 中,而是直接作为子 Item存在。
「this.countable」:检查当前 Item是否应该被计入 length 总和。countable 是一个布尔值,指示是否应将此 Item 的长度计入总长度统计。这通常用于忽略那些不应计入长度统计的元素(比如一些中间状态或特定数据结构的元素)。
「!this.deleted」:确保当前 Item 没有被标记为已删除。我们只想为尚未删除的 Item 增加父 Item 的长度计数。
删除操作:
检查父项是否已被删除:如果 this.parent._item
不为空且标记为已删除,则需要将当前 Item 也删除。
当前元素不是父映射的当前值:如果当前元素通过键(parentSub)与父元素关联,而 this.right 非空说明在映射中它后面还有其他元素,这可能意味着它被更新的元素覆盖或不再是该键的最新映射值。
「情况1:当前Item没有左邻接,存在右邻接」
由上面确定left 为null,所以定义r,且不为YMap结构,所以r为A,结构的开头更新为了插入的D,而A变成了B.right ,即最终结果为A -> D -> B -> C
「情况2:当前Item没有左右邻接」
由于left 和right 都为null,r被更新为null,自己本身就为数据结构的start
。
「情况3:当前」 「Item」 「有左邻接,但右连接不一致」
确定left为D,则定义right为D.right,即C,并更新this.right 为C,再更新D.right为this,最后更新C.right为this,冲突解决后的结果为A -> B -> D -> E -> C
「情况4:采用YMap结构时,并发插入」
确定this.left 为value1 ,则this.right 为null,又因为parentSub 为key1 ,执行this.left.delete(transaction)
将value1删除,最终ymap为{"key1":"value2"}
数据操作和更新
addStruct(transaction.doc.store, this);
this.content.integrate(transaction, this);
addChangedTypeToTransaction(
transaction,
(this.parent),
this.parentSub
);
addStruct
函数负责将 Item 正式注册到文档的数据结构(即structStore)中,使其能够被系统其他部分识别和处理。
integrate
方法在这里被用于将当前 Item的内容(如文本、列表项等)集成到整个文档中。这不仅涉及物理地将内容添加到数据存储中,还包括处理任何必要的冲突解决和数据合并。
addChangedTypeToTransaction
这一步骤记录了对 this.parent 对象的更改,其中 this.parentSub
是当前 Item 在其父对象中的索引或键。这样做的目的是为了在事务完成时,系统能够追踪哪些部分发生了修改,以便将这些更改同步到所有协作端。
特殊情况处理(无parent)
「GC」 「(Garbage Collector,垃圾收集器)」 对象是一种用于处理无效或孤立项目的垃圾收集。垃圾收集器的主要作用是确保这些无效的、逻辑上已删除的元素不会保留在内存中占用资源,并保证数据结构的整洁和一致性。
new GC(this.id, this.length).integrate(transaction, 0);
当存在无parent属性的Item时,就会创建一个GC对象用于处理它,并调用它「自身的integrate方法」用于将此次垃圾收集操作添加到当前的事物当中。
class GC {
constructor (id, length) {
this.id = id
this.length = length
}
get deleted () {
return true
}
delete () {}
integrate (transaction, offset) {
if (offset > 0) {
this.id.clock += offset
this.length -= offset
}
addStruct(transaction.doc.store, this)
}
}
从源码中看出,GC会将此次操作的deleted属性标记为true,并将其添加至StructStore当中。
双缓存删除机制
双缓存删除机制是一种删除管理和垃圾回收优化技术。它通过维护两个不同的缓存来跟踪和管理被删除的内容:DeleteSet
和StateVector
。
「DeleteSet」 :是一个集合,用于跟踪被删除的对象。删除集合中通常包含删除操作的其他属性,例如对象删除的时间戳或长度等。
「StateVector」:是一个向量结构,表示当前文档的状态。对于每个客户端,状态向量记录了它所持有数据的最新版本。这使得系统能够快速识别哪些数据已经同步,哪些数据需要更新。
「执行逻辑」
「删除标记和确认:」 当一个客户端发起删除操作时,它会更新自己的DeleteSet并将删除操作广播给其他客户端。
「同步状态:」 接收到删除操作的客户端会检查自己的 StateVector,以决定是否已经处理过该操作。如果没有处理过,则更新 DeleteSet 并确认该操作。
「全局一致性:」 通过同步 StateVector,每个客户端都能得知其他客户端的最新状态,从而确保全局一致性。
「垃圾回收:」 借助GC释放内存资源。
垃圾回收机制会定期检查并删除 DeleteSet 和删除确认缓冲区中的无用项:
检查删除状态:确定 DeleteSet 中的每个删除操作是否已经被所有客户端处理。
确认不再需要:检查删除操作是否被完全同步,并且在所有客户端上都被正确处理。
删除无用项:实际删除不再需要的删除记录,释放内存。
回溯历史记录
从上述介绍可以看出,Yjs 能够将文档中的所有 item 结构完整保存下来,这对记录历史变更非常有价值。为此,Yjs 内置了 「UndoManager」,提供了历史记录管理功能。
在了解 UndoManager 的实现方式前,我们回顾一下之前了解的Yjs对于删除操作做的特殊处理,对于需要删除的item采用「item.deleted」 标记,但是item并没有记录过多的删除消息(比如:删除时间、是谁删的等),并且structStore并不会额外记录删除操作,仅仅对其标记处理。Yjs对于删除操作的独特处理,于是便引入了DeleteSet来记录逻辑时间轴上某段时间所有删除的item,即「Yjs 中的删除操作被设计成了独立于双向链表的结构」。
在 Yjs 中,transaction 是用于构建事件系统和处理实时同步的核心抽象。每次更新都会生成一个 transaction,它包含两组数据:在这次更新中插入的 items、在这次更新中删除 items 的 DeleteSet。
实际在网络上分发的增量更新数据,就是序列化后的 transaction。因此,transaction 不仅用于编码和传递更新数据,也作为撤销和重做操作的基本单元。
为了撤销一个 transaction,需要进行以下两个操作:
将这次 transaction 中插入的 items 标记为删除。
将这次更新中删除的 items 标记为恢复。
通过这两步操作,就可以实现对一次 transaction 的完整撤销。
同步网络状态
Yjs 的同步机制设计是其实现高效协作编辑的核心,通过「二进制编码」、「状态向量(StateVector)和两阶段同步协议」,解决了分布式系统中数据同步的效率和一致性问题。CRDT 库本身与具体的网络协议无关,但作为一整套渐进的解决方案,Yjs 也设计了配套的网络协议,如y-websocket、y-webrtc等。
Yjs 将文档的完整状态或增量更新编码为 Uint8Array(二进制数据),这种设计使得数据传输更高效、存储更紧凑。每次文档修改(Transaction)会计算出变化的 Item(插入/更新的内容)和 DeleteSet(删除的内容)
,仅传输这些增量数据,而非完整文档状态。
「两阶段同步协议」
「交换状态向量(State Vector Exchange)」
「目的」:确定双方需要同步的数据范围。
「流程」:
客户端 A 发送自己的 State Vector 给客户端 B。
客户端 B 对比双方的状态向量,找到 A 缺失的操作(即 B 有而 A 没有的操作),反之亦然。
「生成并传输增量更新(Delta Update Generation)」
「目的」:仅传输缺失的增量数据。
「流程」:
客户端 B 根据 A 的 StateVector,从本地历史中提取 A 缺失的 Item 和 DeleteSet。
将这些增量数据编码为二进制 Update,发送给客户端 A。
客户端 A 接收到 Update 后,反序列化并应用到本地文档,最终达成一致。
总结
Yjs 是一种应用于构建自动同步协作程序的高性能 CRDT 技术。它把内部 CRDT 模型以共享数据类型呈现,数据变化时能自动合并且避免冲突。
Yjs 使用双向链表存储数据,每个数据项(Item)都有唯一 ID 等关键信息。为了更高效地管理数据,还采用了 StructStore,为每个客户端分配按逻辑时钟排序的 Item 数组。在解决并发冲突方面,Yata 算法发挥着重要作用,通过多种步骤处理不同场景下的冲突。
双缓存删除机制借助 DeleteSet 和 StateVector 跟踪删除对象、表示文档状态,实现高效的删除管理和垃圾回收。Yjs 内置的 UndoManager 能回溯历史记录,通过特定操作实现对事务的撤销。在网络同步上,Yjs 把数据编码为二进制进行传输,并运用两阶段同步协议,提升了同步的效率和准确性。
点击关注公众号,“技术干货” 及时达!