L1.4 - 订阅机制

核心接口

ISubscriptionService 接口

订阅服务接口提供订阅和取消订阅的能力:

interface ISubscriptionService is IPayable {
    function subscribe(
        uint256 chain_id,    // 监听哪条链
        address _contract,   // 监听哪个合约
        uint256 topic_0,     // 事件签名(keccak256哈希)
        uint256 topic_1,     // 索引参数1
        uint256 topic_2,     // 索引参数2
        uint256 topic_3      // 索引参数3
    ) external;
    
    function unsubscribe(...) external;  // 参数相同
}

IReactive 接口

这是响应式合约接口定义了合约如何接收和处理事件:

interface IReactive is IPayer {
    
    // 事件日志的完整数据结构
    struct LogRecord {
        uint256 chain_id;      // 事件来自哪条链
        address _contract;     // 事件来自哪个合约
        uint256 topic_0;       // 事件类型
        uint256 topic_1;       // 索引参数1
        uint256 topic_2;       // 索引参数2
        uint256 topic_3;       // 索引参数3
        bytes data;            // 非索引的额外数据
        uint256 block_number;  // 事件所在区块号
        uint256 op_code;       // 操作码(事件分类)
        uint256 block_hash;    // 所在区块哈希
        uint256 tx_hash;       // 触发事件的交易哈希
        uint256 log_index;     // 日志在交易中的索引
    }

    // 向目标链发送回调的事件
    event Callback(
        uint256 indexed chain_id,   // 目标链ID
        address indexed _contract,  // 目标合约地址
        uint64 indexed gas_limit,   // 最大gas限制
        bytes payload               // 调用数据
    );
    
    // 处理事件的核心函数
    function react(LogRecord calldata log) external;
}

静态订阅

用法

静态订阅的基本写法如下:

address private _callback;  // RN 专用变量
uint256 public counter;     // ReactVM 专用变量

constructor(
    address _service,
    address _contract,
    uint256 topic_0,
    address callback
) payable {
    service = ISystemContract(payable(_service));
    
    if (!vm) {  // 只在 Reactive Network 上订阅
        service.subscribe(
            CHAIN_ID,
            _contract,
            topic_0,
            REACTIVE_IGNORE,  // 不过滤 topic_1
            REACTIVE_IGNORE,  // 不过滤 topic_2
            REACTIVE_IGNORE   // 不过滤 topic_3
        );
    }
    
    _callback = callback;
}

对于订阅条件的配置,存在三种通配符:

address(0) → 匹配任意合约地址
uint256(0) → 匹配任意链ID
REACTIVE_IGNORE → 匹配任意 topic 值

例如,监听特定合约的所有事件:

service.subscribe(
    CHAIN_ID,
    0x7E0987E5b3a30e3f2828572Bb659A548460a3003,  // 指定合约
    REACTIVE_IGNORE,  // 任意事件类型
    REACTIVE_IGNORE,
    REACTIVE_IGNORE,
    REACTIVE_IGNORE
);

这个订阅的效果是该合约发出的任何事件都会触发 react()

而监听特定事件类型(跨所有合约)的订阅如下:

service.subscribe(
    CHAIN_ID,
    address(0),   // 任意合约
    0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1,  // Uniswap V2 Sync
    REACTIVE_IGNORE,
    REACTIVE_IGNORE,
    REACTIVE_IGNORE
);

效果就是所有合约发出的 Sync 事件都会触发 react() (可监控全网所有 Uniswap V2 Pair 的价格变动)

同时也可以监听特定合约+特定事件的组合:

service.subscribe(
    CHAIN_ID,
    0x7E0987E5b3a30e3f2828572Bb659A548460a3003,  // 指定合约
    0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1,  // 指定事件
    REACTIVE_IGNORE,
    REACTIVE_IGNORE,
    REACTIVE_IGNORE
);

最后,多个订阅也支持同时监听多个来源:

constructor(...) payable {
    if (!vm) {
        // 订阅1:监听合约1的所有事件
        service.subscribe(CHAIN_ID, _contract1, REACTIVE_IGNORE, ...);
        
        // 订阅2:监听全网的某类事件
        service.subscribe(CHAIN_ID, address(0), topic_0, ...);
        
        // 可以继续添加更多订阅...
    }
}

禁止事项

  • 不能用 > < 范围过滤参数假设有一个代币合约,每次有人转账就会发出一个事件: Transfer(address from, address to, uint256 amount)。如果你想监听“金额大于 1000 的大额转账”,在响应式网络不能在 subscribe 订阅时写条件: 你不能要求系统合约(ISystemContract)帮你拦截小于 1000 的转账。它没有做数学运算(>、<、>=)的能力。在订阅时,必须对金额参数使用 REACTIVE_IGNORE(无视金额,全部监听)。系统会把每一笔转账(哪怕只有 1 块钱)都推送给你的合约。再在自己的代码中进行筛选。
  • 不能在单次订阅中用 OR(或)逻辑在写代码时,你不能在一次 subscribe() 调用里监听 A 合约 或者 B 合约。如果既想监听 A,又想监听 B,必须在代码里写两行独立的 subscribe() 函数调用。
  • 不能同时订阅所有链的所有事件假设你把链 ID 设置为 0(任意链),把合约地址设置为 address(0)(任意合约),把事件也全设为 REACTIVE_IGNORE(任意事件)。这意味着你要求系统把全世界所有区块链上的每一次转账、每一次交互统统发给你。这不仅毫无意义,而且会瞬间让网络瘫痪,所以系统直接在源头禁止了这种“贪心”操作。你必须至少提供一个具体的目标(比如指定一个具体的合约,或者一个具体的事件)。
  • 不能订阅某条链上的所有事件和上一条类似。就算你指定了“我只听以太坊(Sepolia)上的”,但不指定具体合约和事件,这也是被禁止的。以太坊上每秒钟发生无数笔交易,把这些全推送给你,你的合约根本处理不过来,你的手续费(Gas)也会在一秒钟内被扣光。
  • 重复订阅:技术上允许但按次收费,需避免该事件发生时, react 函数会被触发两次,同时你要付两次的手续费。因为在区块链底层去查重是非常昂贵的(耗费存储空间),为了省整个网络的钱,系统把“防止重复订阅”的责任交给了开发者,需要在写代码时自己确保逻辑严密。

取消订阅

取消订阅是很贵的,是因为以太坊虚拟机(EVM)的数据存储结构决定的。

当你的合约调用 subscribe 时,系统合约在底层通常是把你这套条件(链 ID、合约地址、Topic 等)打包成一条记录,直接塞到一个列表(数组)的最后面。这是很快捷的操作。

当你的合约调用 unsubscribe 时。系统合约为了找到你当初设定的那条规则,必须经历以下步骤:

  • 第一步:极其昂贵的“搜索”(Storage Read) 区块链上的存储(Storage)不像我们平时的数据库那样有高级的索引功能。系统合约可能需要从头到尾遍历(Loop)整个订阅列表,一条一条地去核对。在 EVM 里,每一次读取底层存储(也就是 SLOAD 操作码)都是要收费的。列表越长,它找得越久,消耗的 Gas 费就越高。
  • 第二步:麻烦的“移除与填坑”(Storage Write) 好不容易找到了你那条记录,把它删掉之后,列表里就出现了一个“空洞”。为了节省空间和保持列表的整洁,智能合约通常会把列表里的最后一条记录搬过来,填补这个空洞,然后再把列表的长度减一。 这个过程涉及到修改存储数据(SSTORE 操作码),而在区块链上,“修改和写入数据”是所有操作里最贵、最耗费 Gas 的,因为全网的节点都要跟着同步修改硬盘。

动态订阅

动态订阅就是运行时根据事件内容,动态增加/删除订阅。

首先,直接实现动态订阅在VN/VM中是不可能的,假设你想在收到某个事件时,立刻新增一个订阅。但是,负责处理事件的 react 函数是运行在 ReactVM(虚拟机)里的,而能办理订阅业务的 ISystemContract(系统合约)只存在于 RN 主网上。虚拟机里的代码,摸不到主网的系统合约。

步骤

动态订阅依然得在构造函数里做一次静态订阅。但这次你订阅的不是具体的业务事件,而是“控制指令”。

constructor(ApprovalService service_) payable {
    owner = msg.sender;
    approval_service = service_;
    
    if (!vm) {
        // 静态订阅①:监听"有人想订阅"这个事件
        service.subscribe(
            SEPOLIA_CHAIN_ID,
            address(approval_service),  // 监听 ApprovalService 合约
            SUBSCRIBE_TOPIC_0,          // 订阅请求事件
            REACTIVE_IGNORE,
            REACTIVE_IGNORE,
            REACTIVE_IGNORE
        );
        
        // 静态订阅②:监听"有人想取消订阅"这个事件
        service.subscribe(
            SEPOLIA_CHAIN_ID,
            address(approval_service),  // 监听 ApprovalService 合约
            UNSUBSCRIBE_TOPIC_0,        // 取消订阅请求事件
            REACTIVE_IGNORE,
            REACTIVE_IGNORE,
            REACTIVE_IGNORE
        );
    }
}

接着,用户发起订阅请求:

用户A 在 Sepolia 链上调用:
ApprovalService.requestSubscribe(userA_address)

ApprovalService 合约内部发出事件:
emit Subscribe(userA_address)  
→ topic_0 = SUBSCRIBE_TOPIC_0
→ topic_1 = userA_address

当VN监听到了这个请求后,将其传递到VM执行react()函数。

function react(LogRecord calldata log) external vmOnly {
    
    //  收到了"有人想订阅"的事件
    if (log.topic_0 == SUBSCRIBE_TOPIC_0) {
        
        // 第①步:从事件中取出想要订阅的用户地址
        address subscriber = address(uint160(log.topic_1));
        // log.topic_1 就是 userA_address!
        
        // 第②步:编码要在 RN 上调用的函数
        bytes memory payload = abi.encodeWithSignature(
            "subscribe(address,address)",
            address(0),    // rvm_id 占位(系统自动填入)
            subscriber     // 要监听的用户地址
        );
        
        // 第③步:发出 Callback,请求 RN 执行
        emit Callback(
            REACTIVE_CHAIN_ID,  // 目标:Reactive Network
            address(this),      // 目标合约:本合约的 RN 实例
            CALLBACK_GAS_LIMIT, // gas 限制
            payload             // 调用数据
        );
    }
    ...
}

ReactVM 自己无法调用 service.subscribe(),但它可以通过 emit Callback 给 RN 发消息,调用其订阅。这里RN收到了callback,然后执行订阅:

function subscribe(address rvm_id, address subscriber) 
    external 
    rnOnly           // 只能在 RN 环境执行
    callbackOnly(rvm_id)  // 只能由 service 以 owner 身份触发
{
    service.subscribe(
        SEPOLIA_CHAIN_ID,
        address(0),                      // 任意合约
        APPROVAL_TOPIC_0,                // ERC20 Approval 事件
        REACTIVE_IGNORE,                 // topic_1 不过滤
        uint256(uint160(subscriber)),    // topic_2 = 指定用户地址!
        REACTIVE_IGNORE                  // topic_3 不过滤
    );
}

新增订阅后,假设现在 userA 在某个 ERC20 合约上调用 approve():

USDC.approve(spender, 1000)

发出事件:

Approval(owner=userA, spender=xxx, value=1000)

→ topic_0 = APPROVAL_TOPIC_0

→ topic_1 = userA (owner)

→ topic_2 = spender

→ data = 1000 (amount,非索引参数)

VN监听到这个调用并传递给VM,这时候VM执行真正的逻辑:

function react(LogRecord calldata log) external vmOnly {
    ...
    // 情况3:收到真正的 Approval 事件
    else {
        // 从 data 字段解码出授权金额
        (uint256 amount) = abi.decode(log.data, (uint256));
        
        // 编码要在 Sepolia 上调用的函数
        bytes memory payload = abi.encodeWithSignature(
            "onApproval(address,address,address,address,uint256)",
            address(0),                      // 占位
            address(uint160(log.topic_2)),   // spender 地址
            address(uint160(log.topic_1)),   // owner(userA)地址
            log._contract,                   // 哪个 token 合约
            amount                           // 授权金额
        );
        
        // 回调 Sepolia 上的 ApprovalService
        emit Callback(
            SEPOLIA_CHAIN_ID,               // 目标:Sepolia 链
            address(approval_service),       // 目标:ApprovalService 合约
            CALLBACK_GAS_LIMIT,
            payload
        );
    }
}

而如果要取消订阅:

// react() 中的取消订阅处理
else if (log.topic_0 == UNSUBSCRIBE_TOPIC_0) {
    bytes memory payload = abi.encodeWithSignature(
        "unsubscribe(address,address)",
        address(0),
        address(uint160(log.topic_1))  // 要取消的用户地址
    );
    emit Callback(REACTIVE_CHAIN_ID, address(this), CALLBACK_GAS_LIMIT, payload);
}

// RN 上的取消订阅函数
function unsubscribe(address rvm_id, address subscriber)
    external
    rnOnly
    callbackOnly(rvm_id)
{
    service.unsubscribe(
        SEPOLIA_CHAIN_ID,
        address(0),
        APPROVAL_TOPIC_0,
        REACTIVE_IGNORE,
        uint256(uint160(subscriber)),  // 移除对该用户的监听
        REACTIVE_IGNORE
    );
}
步骤 发生了什么 在哪里
1 用户发出订阅请求事件 Sepolia 链
2 react() 检测到请求 ReactVM
3 react() emit Callback ReactVM
4 系统转发 Callback Reactive Network
5 subscribe() 被调用 Reactive Network
6 service.subscribe() 新增监听 Reactive Network
7 业务事件触发新的 react() ReactVM
8 回调目标链执行业务逻辑 Sepolia 链

双重验证

在新增监听的时候,需要进行双重验证:

require(msg.sender == address(service), 'Callback only');

首先需要验证直接调用这个函数的人是否是系统合约,也就是callback,防止其他人恶意调用新增订阅。

require(evm_id == owner, 'Wrong EVM ID');

然后还需要检查这张跨链回调单子的最初发起人(evm_id,即在 ReactVM 里触发事件的那个地址),是不是这个合约的真正主人(owner)。因为其他的ReactVM也可以调用callback。