This documentation will assist you in using substreams-sink-kv to write data from your existing substreams into a key-value store and serve it back through Connect-Web/GRPC.
Overview
substreams-sink-kv works by reading the output of specially-designed substreams module (usually called kv_out) that produces data in a protobuf-encoded structure called sf.substreams.sink.kv.v1.KVOperations.
The data is written to a key-value store. Currently supported KV store are Badger, BigTable and TiKV.
A Connect-Web interface makes the data available directly from the substreams-sink-kv process. Alternatively, you can consume the data directly from your key-value store.
Requirements
An existing substreams (including substreams.yaml and Rust code) that you want to instrument for substreams-sink-kv.
A key-value store where you want to send your data (a badger local file can be used for development)
Knowledge about Substreams development (start here)
Note The substreams-eth-block-meta is already instrumented for sink-kv, the proposed changes here are a simplified version of what has been implemented. Please adjust the proposed code to your own substreams.
Import the Cargo module
Add the substreams-sink-kv crate to your Cargo.toml:
// src/lib.rs
#[path = "kv_out.rs"]
mod kv;
use substreams_sink_kv::pb::kv::KvOperations;
#[substreams::handlers::map]
pub fn kv_out(
deltas: store::Deltas<DeltaProto<BlockMeta>>,
) -> Result<KvOperations, Error> {
// Create an empty 'KvOperations' structure
let mut kv_ops: KvOperations = Default::default();
// Call a function that will push key-value operations from the deltas
kv::process_deltas(&mut kv_ops, deltas);
// Here, we could add more operations to the kv_ops
// ...
Ok(kv_ops)
}
Add the kv::process_deltas transformation function referenced in the last snippet:
// src/kv_out.rs
use substreams::proto;
use substreams::store::{self, DeltaProto};
use substreams_sink_kv::pb::kv::KvOperations;
use crate::pb::block_meta::BlockMeta;
pub fn process_deltas(ops: &mut KvOperations, deltas: store::Deltas<DeltaProto<BlockMeta>>) {
use substreams::pb::substreams::store_delta::Operation;
for delta in deltas.deltas {
match delta.operation {
// KV Operations do not distinguish between Create and Update.
Operation::Create | Operation::Update => {
let val = proto::encode(&delta.new_value).unwrap();
ops.push_new(delta.key, val, delta.ordinal);
}
Operation::Delete => ops.push_delete(&delta.key, delta.ordinal),
x => panic!("unsupported opeation {:?}", x),
}
}
}
2023-01-12T10:08:31.803-0500INFO (sink-kv) starting prometheus metrics server {"listen_addr":"localhost:9102"}2023-01-12T10:08:31.803-0500 INFO (sink-kv) sink to kv {"dsn": "badger3:///Users/stepd/repos/substreams-sink-kv/badger_data.db", "endpoint": "mainnet.eth.streamingfast.io:443", "manifest_path": "https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.4.0/substreams-eth-block-meta-v0.4.0.spkg", "output_module_name": "kv_out", "block_range": ""}
2023-01-12T10:08:31.803-0500INFO (sink-kv) starting pprof server {"listen_addr":"localhost:6060"}2023-01-12T10:08:31.826-0500 INFO (sink-kv) reading substreams manifest {"manifest_path": "https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.4.0/substreams-eth-block-meta-v0.4.0.spkg"}
2023-01-12T10:08:32.186-0500INFO (sink-kv) validating output store {"output_store":"kv_out"}2023-01-12T10:08:32.186-0500INFO (sink-kv) resolved block range {"start_block":0,"stop_block":0}2023-01-12T10:08:32.186-0500INFO (sink-kv) starting to listen on {"addr":"localhost:8000"}2023-01-12T10:08:32.186-0500INFO (sink-kv) starting stats service {"runs_each":"2s"}2023-01-12T10:08:32.186-0500 INFO (sink-kv) no block data buffer provided. since undo steps are possible, using default buffer size {"size": 12}
2023-01-12T10:08:32.186-0500INFO (sink-kv) starting stats service {"runs_each":"2s"}2023-01-12T10:08:32.186-0500INFO (sink-kv) ready, waiting for signal to quit2023-01-12T10:08:32.186-0500INFO (sink-kv) launching server {"listen_addr":"localhost:8000"}2023-01-12T10:08:32.187-0500INFO (sink-kv) serving plaintext {"listen_addr":"localhost:8000"}2023-01-12T10:08:32.278-0500INFO (sink-kv) session init {"trace_id":"a3c59bd7992c433402b70f9541565d2d"}2023-01-12T10:08:34.186-0500 INFO (sink-kv) substreams sink stats {"db_flush_rate": "10.500 flush/s (21 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-12T10:08:34.186-0500 INFO (sink-kv) substreams sink stats {"progress_msg_rate": "16551.500 msg/s (33103 total)", "block_rate": "10941.500 blocks/s (21883 total)", "last_block": "#291883 (66d03f819dde948b297c8d582889246d7ba11a5b947335497f8716a7b608f78e)"}
Note This writes the data to a local folder "./badger_data.db/" in Badger format. You can rm -rf ./badger_data.db between your tests to cleanup all existing data.
Look at the stored data
You can scan the whole dataset using the 'Scan' command:
Then, enter a key in the text box. The app currently only decodes eth.block_meta.v1.BlockMeta, so you will likely receive the corresponding value encoded in hex string.
To decode the value of your own data structures, add your .proto files under proto/ and generate Rust bindings like this:
The ability to route data extracted from the blockchain by using Substreams is powerful and useful. Key-value stores aren't the only type of sink the data extracted by Substreams can be piped into. Review the core Substreams sinks documentation for additional information on other types of sinks and sinking strategies.