稀土掘金技术社区 03月02日
多人在线协作的神秘代码:Yjs 底层解密
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Yjs是高性能的基于操作的CRDT,用于构建协作应用程序。它结合CRDT自动合并能力等,解决传统OT方案的问题。本文围绕Yjs的多方面进行讲解,包括底层结构、解决冲突等。

Yjs将内部CRDT模型公开为可并发操作的共享数据类型,如YMap、YArray等,数据改变时会触发事件并自动合并

Yjs的建模数据结构为双向链表,每个Item记录用户操作数据,客户端接收全部Item可重建一致文档状态

Yjs为每个item分配唯一ID确定逻辑时间轴位置,存在两种索引数据方式:双向链表和StructStore

Yjs通过YATA算法解决并发冲突,当有新Item插入等操作时执行integrate方法确保文档状态一致

原创 Delighted 2025-03-02 09:02 重庆

点击关注公众号,“技术干货” 及时达!

摘要

CRDT(无冲突复制数据类型)是一种能在网络多台计算机上复制的数据结构,副本可独立并行更新且无冲突,最终能达成一致。CRDT 可在多用户同时操作同一页面时保证一致性,支持离线使用和分区容错,能很好地权衡 CAP 定理中的一致性、可用性和分区容错性。

尽管 CRDT 提供了理论上的基础,但具体的实现和应用并不是简单的任务。使用第三方库可以大大简化应用 CRDT 的过程,确保实现的正确性和效率。本文将在初始CRDT的基础上继续来深入了解开发过程中实现协同的第三方库Yjs,围绕Yjs的底层结构、解决冲突、删除机制、Undo/Redo以及网络几部分进行讲解,再了解完底层原理后可以尝试使用react+Yjs实现简单的协同编辑demo。

Yjs 结合了 「CRDT 的自动合并能力」「去中心化架构」「高效增量同步」,解决了传统 OT 方案对中心化服务器的依赖和冲突处理的复杂性,同时提供了强大的离线支持和扩展性,完美契合现代分布式协作应用的需求,因此成为主流选择。

传统 OT 方案是在服务器收集多个用户的操作后,进行排序处理,然后按照这个顺序将操作再同步给其它用户,而Yjs是将其他用户操作传递给本地,在客户端进行冲突处理,也无需注重传递的先后顺序。

Yjs是一种高性能的基于操作的CRDT( operation-based CRDT),常用于构建自动同步的协作应用程序。它将其内部 CRDT 模型公开为可并发操作的共享数据类型。共享数据类型通常有类似于Map或者常见数据类型Array等(即YMap、YArray、YText等,统称为YModel),在这些数据类型发生改变时,会触发事件,并自动合并且不会发生合并冲突,来实现「最终一致性」

Yjs的建模数据结构

Yjs 在工程上建模 CRDT 时所用的基础数据结构:「双向链表」

在 Yjs 中,不论是 YText、YArray 还是 YMap,这些 YModel 实例中的数据都存储在一条双向链表里。粗略地说,「这条链表中的每一项(或者说每个 Item)都唯一地记录了某次用户操作所修改的数据」,某种程度上和区块链有些异曲同工。可以认为对 YModel 的操作,最后都会转为对这条链表的 append、insert、split、merge 等结构变换。链表中每个 item 会被序列化编码后分发,而基于 CRDT 算法的保证,「只要每个客户端最终都能接收到全部的 item,那么不论客户端以何种顺序接收到这些 Item,它们都能重建出完全一致的文档状态(强最终一致性)」

「Item的核心数据结构:」


class Item { constructor(id, origin, rightOrigin, parent, parentSub, content) { this.id = id; // 唯一标识符 { client, clock } this.origin = origin; // 当前操作的左侧位置引用 this.rightOrigin = rightOrigin; // 当前操作的右侧位置引用 this.parent = parent; // 父节点引用 this.parentSub = parentSub; // 子引用 this.content = content; // 实际内容 this.left = null; // 链表中左侧节点 this.right = null; // 链表中右侧节点 } }

    「ID(clientID,clock)」

在多人协作的过程中,Yjs会为每个item分配一个唯一ID来确定其在逻辑时间轴的位置上。


class ID { constructor (client: number, clock: number) { // 客户端的唯一 ID this.client = client // 逻辑时间戳 this.clock = clock } }

更新逻辑为:

这个逻辑相对于上文《初始CRDT》提到的check_state,确保了任意两个item之间在逻辑上的先后关系。

    「content」

content 指 Item 所存储的数据(比如对应插入的字符串),存储了当前 Item 对应的基础类型或者复合类型;

    「position」

left,right(Yjs 的 CRDT 数据结构基于双向链表,依赖于此);origin,originRight,这两项属性是为了解决冲突;parent,parentSub,这两项是为了支持 Yjs 的灵活的嵌套数据结构;

「parent」 属性一般用于标识某个 Item 所属的父级对象。

「parentSub」 属性用于标识当前 Item 在其父对象中的键(key)。通常用于YMap结构中,对于YArray、YText等结构,他们均可通过下标来获取准确位置。

  // 创建一个新的 Yjs 文档    const doc = new Y.Doc();    // 创建一个 YMap    const yMap = doc.getMap('exampleMap');    // 插入元素到 YMap 中    const yText = new Y.Text('Hello, world!');    yMap.set('greeting', yText);    console.log(yText.parent === yMap);  // 输出: true    console.log(yText.parentSub);  // 输出: 'greeting'

以将itemB插入至itemA和itemC为例:

item是如何存储的呢?

由于任意 item 均可用 ID 来排序,因此一种选择是使用 B 树这样具备对数级优良插入和删除时间复杂度的数据结构,将所有的 item 维护在一棵平衡二叉树中。但实践中 Yjs 选择了一种更简单直接的方案,「即为每个 client 分配一个扁平的 item 数组」,相应的结构被称为「StructStore」

    doc.store.clients: {          1111111: [Item, Item, Item], // ...          2222222: [Item, Item, Item] // ...    }

每个item数组都是按clock字段的值来排序的,这样只需要做一次二分查找,就能快速找到某个ID对应的item。而在接收到远程 item 时,Yjs 除了将 item 插入链表,也会将它插入 StructStore 中的相应位置。

因此我们可以认为,Yjs 中存在两种索引数据的方式:

解决并发冲突(Yata算法)

「什么是冲突呢?」

冲突(Conflict)是指当多个用户同时对同一文档的同一位置进行独立操作时,这些操作因顺序或逻辑矛盾导致文档状态无法直接合并的现象。

「冲突的核心原因」

    「并发性」:操作在不同客户端独立产生,网络延迟导致操作到达服务器的顺序不确定。

    「位置重叠」:多个操作作用于相同逻辑位置(如插入到同一字符后)。

    「缺乏全局时序」:没有统一的时钟或顺序保证所有客户端对操作的理解一致。

「YATA 算法」背后的核心思想是通过维护操作的顺序和冲突解决机制,确保所有并发修改能够正确合并,并保持文档在所有副本之间的一致性。当存在新Item 插入、现有Item 更新或删除时执行integrate(transaction,offset) 方法,使得Yjs 能够处理不同客户端的并发操作,并确保最终文档状态一致。

「Y A T A !」 这几个字符每个都对应一个 item(或者说一次字符插入的 operation)。它们通过 left 和 right 字段连接在一起。在插入新字符 T 的时候,Yjs 就会「根据 item 的 ID(逻辑时间戳) 在链表中查找合适的插入位置,将新字符对应的 item 接入链表中。」 接下来根据源码来看integrate(transaction,offset)是如何实现?

tips:「Transaction(事物)」 在 Yjs 中是用于管理和封装一系列对文档的更改操作,一个Transaction关联一个Y.doc,采用structStore存储其对应的操作历史、状态同步。

更新时钟和分割内容(处理偏移量)

  if (offset > 0) {      this.id.clock += offset;      this.left = getItemCleanEnd(transaction, transaction.doc.store, createID(this.id.client, this.id.clock - 1));      this.origin = this.left.lastId;      this.content = this.content.splice(offset);      this.length -= offset;    }

offset偏移量,用于准确确定用户插入或删除的位置。当存在偏移量的时候,会根据偏移量更新clock逻辑时间戳,然后找到新位置下的文档结构的左侧item节点并更新origin为它的ID,最后将内容分割(分割内容为原内容从 offset 位置到结束位置)。

处理冲突(parent存在)

    判断冲突


<!---->
if ((!this.left && (!this.right || this.right.left !== null)) || (this.left && this.left.right !== this.right))

该条件用于检查当前 Item 对象(即 this)的位置是否存在不一致或潜在的冲突。

假设初始链表为B -> C,用户1在开头插入A,但由于用户1网络延迟,此时用户2已经在开头插入了D。

最终链表结构:


D -> B -> C / A

产生的可能原因:头部插入延迟、同步丢失或被覆盖、删除和插入操作并发。

假设初始链表为A -> B -> C,用户1和用户2几乎同时在B和C之间分别插入D和E。

   A -> B -> D -> C               A -> B -> E -> C
// 此时链表连接关系是:// 此时链表连接关系是: B.right = D B.right = E D.left = B E.left = B D.right = C E.right = C C.left = D C.left = E
并发冲突处理 A -> B -> D -> E -> C
// 合并后的链表连接关系是: B.right = D D.left = B D.right = E E.left = D E.right = C C.left = E

对于Item(E)来说this.left.right = D this.right = C,造成冲突。

产生的可能原因:不同用户同时在相同位置插入的冲突。

    冲突与位置检测


<!---->
let left = this.left let o if (left !== null) { o = left.right } else if (this.parentSub !== null) { o =(this.parent)._map.get(this.parentSub) || null while (o !== null && o.left !== null) { o = o.left } } else { o =(this.parent)._start } const conflictingItems = new Set() const itemsBeforeOrigin = new Set()

left被初始化为当前 Item 的左邻接。

「o」 是一个中间变量,用于查找并记录寻找潜在冲突的Item。

「conflictingItems:」 用于记录与 this 存在冲突的 Item。

「itemsBeforeOrigin:」 用于记录追踪遍历过程中所有遇到的 Item。

 while (o !== null && o !== this.right) {        itemsBeforeOrigin.add(o)        conflictingItems.add(o)        if (compareIDs(this.origin, o.origin)) {            if (o.id.client < this.id.client) {                  left = o                  conflictingItems.clear()            } else if (compareIDs(this.rightOrigin, o.rightOrigin)) {                  break           }         } else if (o.origin !== null && itemsBeforeOrigin.has(getItem(transaction.doc.store, o.origin))) {             if (!conflictingItems.has(getItem(transaction.doc.store, o.origin))) {                  left = o                  conflictingItems.clear()            }        } else {                break        }        o = o.right    }    this.left = left

while 循环遍历文档从 o 开始到 this.right 位置的「所有项」,其中 o 最初被设置为 left.right(即 left 后的一个 Item)。目的是找到正确的插入位置并解决可能的冲突。

将正在观察的o添加到conflictingItems、itemsBeforeOrigin,再根据origin是否相同进一步比较ID来决定位置。

循环结束后,更新this.left。

以下情况对应上文三种情况的案例:

进行插入A时

首先,由于left 为null,且不为YMap结构,确定o 的值为(this.parent)._start,确定D与A存在冲突。

进入while循环,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为null,假设用户1的clientId小于用户2,且compareIDs(this.rightOrigin, o.rightOrigin)为true,跳出循环,left为null,即确定了插入位置。

向初始为空的数据结构中插入不存在位置冲突。

用户2插入E时

left为B,由于用户1稍快一点插入,o=left.right=D ,确定D和E存在冲突。

进入while循环,从D开始,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为B,假设用户1的clientId小于用户2,则更新left为D,o 为C,此时不满足o !== this.right 不再进行循环,最终确定left为D。

由于YMap主要以键值对的方式存储,YMap 的操作依赖于键来确定其内部数据的存储和查找方式,而不依赖于链表的顺序,所以不存在偏移量的概念,在 YMap 中插入新的键值对时,不会默认设置 left 和 right 指针,并且也没有origin和rightorigin 的概念。

假设用户1和用户2并发插入,并且用户1的clientId小于用户2。


const ydoc = new Y.Doc() const ymap = ydoc.getMap('Map') //用户1 ymap.set('key1', 'value1'); //用户2 ymap.set('key1', 'value2');

当用户2插入时,由于存在parentSub,所以o 被确认为value1 ,即value1 与value2 存在冲突。

进入while循环,从value1开始,并将o 添加到conflictingItems、itemsBeforeOrigin,this.origin和o.origin都为null,且满足o.id.client < this.id.client,则left 为value1,随后o 取下一个值为null,跳出循环。

链表结构更新


if (this.left !== null) { const right = this.left.right; this.right = right; this.left.right = this; } else { let r; if (this.parentSub !== null) { r =(this.parent)._map.get(this.parentSub) || null; while (r !== null && r.left !== null) { r = r.left; } } else { r =(this.parent)._start; (this.parent)._start = this; } this.right = r; } if (this.right !== null) { this.right.left = this; } else if (this.parentSub !== null) { (this.parent)._map.set(this.parentSub,this); if (this.left !== null) { this.left.delete(transaction); } } if (this.parentSub === null && this.countable && !this.deleted) { (this.parent)._length += this.length; } if ( ((this.parent)._item !== null &&(this.parent)._item.deleted) || (this.parentSub !== null && this.right !== null) ) { this.delete(transaction); }

如果 this.left 不为空,则将当前项 (this) 插入到文档中,通过调整 left 和 right 的指针来达成。

如果 left 为空(即当前项可能是列表的新起始项),则需要查找合适的 right 项,并更新 parent 的起始项。

如果 this.right 不为空,即当前 Item 有一个在其后面需要直接来连的项,则设置那个后续项的 left 属性指向当前 Item。这样确保链表从左到右和从右到左的链接都是完整的。

如果Item 没有右邻接项(this.right 为 null),同时 this 位于YMap结构中(有 parentSub)的情况。更新当前 parentSub,如果存在left将其删除。(类似于其中一个用户试图删除一个元素,而另一个用户几乎同时在相同的位置尝试添加或更新一个元素。)

调整父节点的长度parent,反映新增加的 Item 的长度。

    「this.parentSub === null」:检查当前 Item 是否没有子项标识符 (parentSub)。这通常意味着当前 Item 不是存储在父 Item 的映射 (_map) 中,而是直接作为子 Item存在。

    「this.countable」:检查当前 Item是否应该被计入 length 总和。countable 是一个布尔值,指示是否应将此 Item 的长度计入总长度统计。这通常用于忽略那些不应计入长度统计的元素(比如一些中间状态或特定数据结构的元素)。

    「!this.deleted」:确保当前 Item 没有被标记为已删除。我们只想为尚未删除的 Item 增加父 Item 的长度计数。

删除操作:

    检查父项是否已被删除:如果 this.parent._item 不为空且标记为已删除,则需要将当前 Item 也删除。

    当前元素不是父映射的当前值:如果当前元素通过键(parentSub)与父元素关联,而 this.right 非空说明在映射中它后面还有其他元素,这可能意味着它被更新的元素覆盖或不再是该键的最新映射值。

数据操作和更新


addStruct(transaction.doc.store, this); this.content.integrate(transaction, this); addChangedTypeToTransaction( transaction, (this.parent), this.parentSub );

特殊情况处理(无parent)

「GC」 「(Garbage Collector,垃圾收集器)」 对象是一种用于处理无效或孤立项目的垃圾收集。垃圾收集器的主要作用是确保这些无效的、逻辑上已删除的元素不会保留在内存中占用资源,并保证数据结构的整洁和一致性。

   new GC(this.id, this.length).integrate(transaction, 0);

当存在无parent属性的Item时,就会创建一个GC对象用于处理它,并调用它「自身的integrate方法」用于将此次垃圾收集操作添加到当前的事物当中。


class GC { constructor (id, length) { this.id = id this.length = length } get deleted () { return true }
delete () {} integrate (transaction, offset) { if (offset > 0) { this.id.clock += offset this.length -= offset } addStruct(transaction.doc.store, this) } }

从源码中看出,GC会将此次操作的deleted属性标记为true,并将其添加至StructStore当中。

双缓存删除机制

双缓存删除机制是一种删除管理和垃圾回收优化技术。它通过维护两个不同的缓存来跟踪和管理被删除的内容:DeleteSetStateVector

「DeleteSet」 :是一个集合,用于跟踪被删除的对象。删除集合中通常包含删除操作的其他属性,例如对象删除的时间戳或长度等。

「StateVector」:是一个向量结构,表示当前文档的状态。对于每个客户端,状态向量记录了它所持有数据的最新版本。这使得系统能够快速识别哪些数据已经同步,哪些数据需要更新。

「执行逻辑」

    「删除标记和确认:」 当一个客户端发起删除操作时,它会更新自己的DeleteSet并将删除操作广播给其他客户端。

    「同步状态:」 接收到删除操作的客户端会检查自己的 StateVector,以决定是否已经处理过该操作。如果没有处理过,则更新 DeleteSet 并确认该操作。

    「全局一致性:」 通过同步 StateVector,每个客户端都能得知其他客户端的最新状态,从而确保全局一致性。

    「垃圾回收:」 借助GC释放内存资源。

    垃圾回收机制会定期检查并删除 DeleteSet 和删除确认缓冲区中的无用项:

回溯历史记录

从上述介绍可以看出,Yjs 能够将文档中的所有 item 结构完整保存下来,这对记录历史变更非常有价值。为此,Yjs 内置了 「UndoManager」,提供了历史记录管理功能。

在了解 UndoManager 的实现方式前,我们回顾一下之前了解的Yjs对于删除操作做的特殊处理,对于需要删除的item采用「item.deleted」 标记,但是item并没有记录过多的删除消息(比如:删除时间、是谁删的等),并且structStore并不会额外记录删除操作,仅仅对其标记处理。Yjs对于删除操作的独特处理,于是便引入了DeleteSet来记录逻辑时间轴上某段时间所有删除的item,即「Yjs 中的删除操作被设计成了独立于双向链表的结构」

在 Yjs 中,transaction 是用于构建事件系统和处理实时同步的核心抽象。每次更新都会生成一个 transaction,它包含两组数据:在这次更新中插入的 items、在这次更新中删除 items 的 DeleteSet。

实际在网络上分发的增量更新数据,就是序列化后的 transaction。因此,transaction 不仅用于编码和传递更新数据,也作为撤销和重做操作的基本单元。

为了撤销一个 transaction,需要进行以下两个操作:

    将这次 transaction 中插入的 items 标记为删除。

    将这次更新中删除的 items 标记为恢复。

通过这两步操作,就可以实现对一次 transaction 的完整撤销。

同步网络状态

Yjs 的同步机制设计是其实现高效协作编辑的核心,通过「二进制编码」「状态向量(StateVector)和两阶段同步协议」,解决了分布式系统中数据同步的效率和一致性问题。CRDT 库本身与具体的网络协议无关,但作为一整套渐进的解决方案,Yjs 也设计了配套的网络协议,如y-websocket、y-webrtc等。

Yjs 将文档的完整状态或增量更新编码为 Uint8Array(二进制数据),这种设计使得数据传输更高效、存储更紧凑。每次文档修改(Transaction)会计算出变化的 Item(插入/更新的内容)和 DeleteSet(删除的内容),仅传输这些增量数据,而非完整文档状态。

「两阶段同步协议」

    「交换状态向量(State Vector Exchange)」

    「目的」:确定双方需要同步的数据范围。

    「流程」

「生成并传输增量更新(Delta Update Generation)」

「目的」:仅传输缺失的增量数据。

「流程」

总结

Yjs 是一种应用于构建自动同步协作程序的高性能 CRDT 技术。它把内部 CRDT 模型以共享数据类型呈现,数据变化时能自动合并且避免冲突。

Yjs 使用双向链表存储数据,每个数据项(Item)都有唯一 ID 等关键信息。为了更高效地管理数据,还采用了 StructStore,为每个客户端分配按逻辑时钟排序的 Item 数组。在解决并发冲突方面,Yata 算法发挥着重要作用,通过多种步骤处理不同场景下的冲突。

双缓存删除机制借助 DeleteSet 和 StateVector 跟踪删除对象、表示文档状态,实现高效的删除管理和垃圾回收。Yjs 内置的 UndoManager 能回溯历史记录,通过特定操作实现对事务的撤销。在网络同步上,Yjs 把数据编码为二进制进行传输,并运用两阶段同步协议,提升了同步的效率和准确性。

点击关注公众号,“技术干货” 及时达!


阅读原文

跳转微信打开

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Yjs CRDT 协作应用 解决冲突
相关文章