核心接口
ISubscriptionService 接口
订阅服务接口提供订阅和取消订阅的能力:
interface ISubscriptionService is IPayable {
function subscribe(
uint256 chain_id, // 监听哪条链
address _contract, // 监听哪个合约
uint256 topic_0, // 事件签名(keccak256哈希)
uint256 topic_1, // 索引参数1
uint256 topic_2, // 索引参数2
uint256 topic_3 // 索引参数3
) external;
function unsubscribe(...) external; // 参数相同
}
IReactive 接口
这是响应式合约接口定义了合约如何接收和处理事件:
interface IReactive is IPayer {
// 事件日志的完整数据结构
struct LogRecord {
uint256 chain_id; // 事件来自哪条链
address _contract; // 事件来自哪个合约
uint256 topic_0; // 事件类型
uint256 topic_1; // 索引参数1
uint256 topic_2; // 索引参数2
uint256 topic_3; // 索引参数3
bytes data; // 非索引的额外数据
uint256 block_number; // 事件所在区块号
uint256 op_code; // 操作码(事件分类)
uint256 block_hash; // 所在区块哈希
uint256 tx_hash; // 触发事件的交易哈希
uint256 log_index; // 日志在交易中的索引
}
// 向目标链发送回调的事件
event Callback(
uint256 indexed chain_id, // 目标链ID
address indexed _contract, // 目标合约地址
uint64 indexed gas_limit, // 最大gas限制
bytes payload // 调用数据
);
// 处理事件的核心函数
function react(LogRecord calldata log) external;
}
静态订阅
用法
静态订阅的基本写法如下:
address private _callback; // RN 专用变量
uint256 public counter; // ReactVM 专用变量
constructor(
address _service,
address _contract,
uint256 topic_0,
address callback
) payable {
service = ISystemContract(payable(_service));
if (!vm) { // 只在 Reactive Network 上订阅
service.subscribe(
CHAIN_ID,
_contract,
topic_0,
REACTIVE_IGNORE, // 不过滤 topic_1
REACTIVE_IGNORE, // 不过滤 topic_2
REACTIVE_IGNORE // 不过滤 topic_3
);
}
_callback = callback;
}
对于订阅条件的配置,存在三种通配符:
address(0) → 匹配任意合约地址
uint256(0) → 匹配任意链ID
REACTIVE_IGNORE → 匹配任意 topic 值
例如,监听特定合约的所有事件:
service.subscribe(
CHAIN_ID,
0x7E0987E5b3a30e3f2828572Bb659A548460a3003, // 指定合约
REACTIVE_IGNORE, // 任意事件类型
REACTIVE_IGNORE,
REACTIVE_IGNORE,
REACTIVE_IGNORE
);
这个订阅的效果是该合约发出的任何事件都会触发 react()。
而监听特定事件类型(跨所有合约)的订阅如下:
service.subscribe(
CHAIN_ID,
address(0), // 任意合约
0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1, // Uniswap V2 Sync
REACTIVE_IGNORE,
REACTIVE_IGNORE,
REACTIVE_IGNORE
);
效果就是所有合约发出的 Sync 事件都会触发 react() (可监控全网所有 Uniswap V2 Pair 的价格变动)
同时也可以监听特定合约+特定事件的组合:
service.subscribe(
CHAIN_ID,
0x7E0987E5b3a30e3f2828572Bb659A548460a3003, // 指定合约
0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1, // 指定事件
REACTIVE_IGNORE,
REACTIVE_IGNORE,
REACTIVE_IGNORE
);
最后,多个订阅也支持同时监听多个来源:
constructor(...) payable {
if (!vm) {
// 订阅1:监听合约1的所有事件
service.subscribe(CHAIN_ID, _contract1, REACTIVE_IGNORE, ...);
// 订阅2:监听全网的某类事件
service.subscribe(CHAIN_ID, address(0), topic_0, ...);
// 可以继续添加更多订阅...
}
}
禁止事项
- 不能用
><范围过滤参数假设有一个代币合约,每次有人转账就会发出一个事件:Transfer(address from, address to, uint256 amount)。如果你想监听“金额大于 1000 的大额转账”,在响应式网络不能在subscribe订阅时写条件: 你不能要求系统合约(ISystemContract)帮你拦截小于 1000 的转账。它没有做数学运算(>、<、>=)的能力。在订阅时,必须对金额参数使用REACTIVE_IGNORE(无视金额,全部监听)。系统会把每一笔转账(哪怕只有 1 块钱)都推送给你的合约。再在自己的代码中进行筛选。 - 不能在单次订阅中用
OR(或)逻辑在写代码时,你不能在一次subscribe()调用里监听 A 合约 或者 B 合约。如果既想监听 A,又想监听 B,必须在代码里写两行独立的subscribe()函数调用。 - 不能同时订阅所有链的所有事件假设你把链 ID 设置为
0(任意链),把合约地址设置为address(0)(任意合约),把事件也全设为REACTIVE_IGNORE(任意事件)。这意味着你要求系统把全世界所有区块链上的每一次转账、每一次交互统统发给你。这不仅毫无意义,而且会瞬间让网络瘫痪,所以系统直接在源头禁止了这种“贪心”操作。你必须至少提供一个具体的目标(比如指定一个具体的合约,或者一个具体的事件)。 - 不能订阅某条链上的所有事件和上一条类似。就算你指定了“我只听以太坊(Sepolia)上的”,但不指定具体合约和事件,这也是被禁止的。以太坊上每秒钟发生无数笔交易,把这些全推送给你,你的合约根本处理不过来,你的手续费(Gas)也会在一秒钟内被扣光。
- 重复订阅:技术上允许但按次收费,需避免该事件发生时,
react函数会被触发两次,同时你要付两次的手续费。因为在区块链底层去查重是非常昂贵的(耗费存储空间),为了省整个网络的钱,系统把“防止重复订阅”的责任交给了开发者,需要在写代码时自己确保逻辑严密。
取消订阅
取消订阅是很贵的,是因为以太坊虚拟机(EVM)的数据存储结构决定的。
当你的合约调用 subscribe 时,系统合约在底层通常是把你这套条件(链 ID、合约地址、Topic 等)打包成一条记录,直接塞到一个列表(数组)的最后面。这是很快捷的操作。
当你的合约调用 unsubscribe 时。系统合约为了找到你当初设定的那条规则,必须经历以下步骤:
- 第一步:极其昂贵的“搜索”(Storage Read) 区块链上的存储(Storage)不像我们平时的数据库那样有高级的索引功能。系统合约可能需要从头到尾遍历(Loop)整个订阅列表,一条一条地去核对。在 EVM 里,每一次读取底层存储(也就是
SLOAD操作码)都是要收费的。列表越长,它找得越久,消耗的 Gas 费就越高。 - 第二步:麻烦的“移除与填坑”(Storage Write) 好不容易找到了你那条记录,把它删掉之后,列表里就出现了一个“空洞”。为了节省空间和保持列表的整洁,智能合约通常会把列表里的最后一条记录搬过来,填补这个空洞,然后再把列表的长度减一。 这个过程涉及到修改存储数据(
SSTORE操作码),而在区块链上,“修改和写入数据”是所有操作里最贵、最耗费 Gas 的,因为全网的节点都要跟着同步修改硬盘。
动态订阅
动态订阅就是运行时根据事件内容,动态增加/删除订阅。
首先,直接实现动态订阅在VN/VM中是不可能的,假设你想在收到某个事件时,立刻新增一个订阅。但是,负责处理事件的 react 函数是运行在 ReactVM(虚拟机)里的,而能办理订阅业务的 ISystemContract(系统合约)只存在于 RN 主网上。虚拟机里的代码,摸不到主网的系统合约。
步骤
动态订阅依然得在构造函数里做一次静态订阅。但这次你订阅的不是具体的业务事件,而是“控制指令”。
constructor(ApprovalService service_) payable {
owner = msg.sender;
approval_service = service_;
if (!vm) {
// 静态订阅①:监听"有人想订阅"这个事件
service.subscribe(
SEPOLIA_CHAIN_ID,
address(approval_service), // 监听 ApprovalService 合约
SUBSCRIBE_TOPIC_0, // 订阅请求事件
REACTIVE_IGNORE,
REACTIVE_IGNORE,
REACTIVE_IGNORE
);
// 静态订阅②:监听"有人想取消订阅"这个事件
service.subscribe(
SEPOLIA_CHAIN_ID,
address(approval_service), // 监听 ApprovalService 合约
UNSUBSCRIBE_TOPIC_0, // 取消订阅请求事件
REACTIVE_IGNORE,
REACTIVE_IGNORE,
REACTIVE_IGNORE
);
}
}
接着,用户发起订阅请求:
用户A 在 Sepolia 链上调用:
ApprovalService.requestSubscribe(userA_address)
ApprovalService 合约内部发出事件:
emit Subscribe(userA_address)
→ topic_0 = SUBSCRIBE_TOPIC_0
→ topic_1 = userA_address
当VN监听到了这个请求后,将其传递到VM执行react()函数。
function react(LogRecord calldata log) external vmOnly {
// 收到了"有人想订阅"的事件
if (log.topic_0 == SUBSCRIBE_TOPIC_0) {
// 第①步:从事件中取出想要订阅的用户地址
address subscriber = address(uint160(log.topic_1));
// log.topic_1 就是 userA_address!
// 第②步:编码要在 RN 上调用的函数
bytes memory payload = abi.encodeWithSignature(
"subscribe(address,address)",
address(0), // rvm_id 占位(系统自动填入)
subscriber // 要监听的用户地址
);
// 第③步:发出 Callback,请求 RN 执行
emit Callback(
REACTIVE_CHAIN_ID, // 目标:Reactive Network
address(this), // 目标合约:本合约的 RN 实例
CALLBACK_GAS_LIMIT, // gas 限制
payload // 调用数据
);
}
...
}
ReactVM 自己无法调用 service.subscribe(),但它可以通过 emit Callback 给 RN 发消息,调用其订阅。这里RN收到了callback,然后执行订阅:
function subscribe(address rvm_id, address subscriber)
external
rnOnly // 只能在 RN 环境执行
callbackOnly(rvm_id) // 只能由 service 以 owner 身份触发
{
service.subscribe(
SEPOLIA_CHAIN_ID,
address(0), // 任意合约
APPROVAL_TOPIC_0, // ERC20 Approval 事件
REACTIVE_IGNORE, // topic_1 不过滤
uint256(uint160(subscriber)), // topic_2 = 指定用户地址!
REACTIVE_IGNORE // topic_3 不过滤
);
}
新增订阅后,假设现在 userA 在某个 ERC20 合约上调用 approve():
USDC.approve(spender, 1000)
发出事件:
Approval(owner=userA, spender=xxx, value=1000)
→ topic_0 = APPROVAL_TOPIC_0
→ topic_1 = userA (owner)
→ topic_2 = spender
→ data = 1000 (amount,非索引参数)
VN监听到这个调用并传递给VM,这时候VM执行真正的逻辑:
function react(LogRecord calldata log) external vmOnly {
...
// 情况3:收到真正的 Approval 事件
else {
// 从 data 字段解码出授权金额
(uint256 amount) = abi.decode(log.data, (uint256));
// 编码要在 Sepolia 上调用的函数
bytes memory payload = abi.encodeWithSignature(
"onApproval(address,address,address,address,uint256)",
address(0), // 占位
address(uint160(log.topic_2)), // spender 地址
address(uint160(log.topic_1)), // owner(userA)地址
log._contract, // 哪个 token 合约
amount // 授权金额
);
// 回调 Sepolia 上的 ApprovalService
emit Callback(
SEPOLIA_CHAIN_ID, // 目标:Sepolia 链
address(approval_service), // 目标:ApprovalService 合约
CALLBACK_GAS_LIMIT,
payload
);
}
}
而如果要取消订阅:
// react() 中的取消订阅处理
else if (log.topic_0 == UNSUBSCRIBE_TOPIC_0) {
bytes memory payload = abi.encodeWithSignature(
"unsubscribe(address,address)",
address(0),
address(uint160(log.topic_1)) // 要取消的用户地址
);
emit Callback(REACTIVE_CHAIN_ID, address(this), CALLBACK_GAS_LIMIT, payload);
}
// RN 上的取消订阅函数
function unsubscribe(address rvm_id, address subscriber)
external
rnOnly
callbackOnly(rvm_id)
{
service.unsubscribe(
SEPOLIA_CHAIN_ID,
address(0),
APPROVAL_TOPIC_0,
REACTIVE_IGNORE,
uint256(uint160(subscriber)), // 移除对该用户的监听
REACTIVE_IGNORE
);
}
| 步骤 | 发生了什么 | 在哪里 |
|---|---|---|
| 1 | 用户发出订阅请求事件 | Sepolia 链 |
| 2 | react() 检测到请求 | ReactVM |
| 3 | react() emit Callback | ReactVM |
| 4 | 系统转发 Callback | Reactive Network |
| 5 | subscribe() 被调用 | Reactive Network |
| 6 | service.subscribe() 新增监听 | Reactive Network |
| 7 | 业务事件触发新的 react() | ReactVM |
| 8 | 回调目标链执行业务逻辑 | Sepolia 链 |
双重验证
在新增监听的时候,需要进行双重验证:
require(msg.sender == address(service), 'Callback only');
首先需要验证直接调用这个函数的人是否是系统合约,也就是callback,防止其他人恶意调用新增订阅。
require(evm_id == owner, 'Wrong EVM ID');
然后还需要检查这张跨链回调单子的最初发起人(evm_id,即在 ReactVM 里触发事件的那个地址),是不是这个合约的真正主人(owner)。因为其他的ReactVM也可以调用callback。