LogoLogo
Package RegistryThe Graph
  • Introduction
  • Getting Started
  • Tutorials
    • Develop Your First Substreams
      • on EVM
      • on Solana
        • Transactions & Instructions
        • Account Changes
      • on Cosmos
        • Injective
        • MANTRA
      • on Starknet
      • on Stellar
    • Publishing a Substreams Package
  • How-To Guides
    • Developing Substreams
      • on EVM
        • Exploring Ethereum
          • Mapping Blocks
          • Filter Transactions
          • Retrieve Events of a Smart Contract
      • on Solana
        • Explore Solana
          • Filter Instructions
          • Filter Transactions
        • SPL Token Tracker
        • NFT Trades
        • DEX Trades
      • on Cosmos
        • Injective
          • Simple Substreams Example
          • Foundational Modules
          • Dojo DEX USDT Volume Subgraph Example
    • Using a Substreams Sink
      • Substreams:SQL
      • Substreams:Subgraph
        • Triggers
        • Graph Out
      • Substreams:Stream
        • JavaScript
        • Go
      • Substreams:PubSub
      • Community Sinks
        • MongoDB
        • Files
        • Key-Value Store
        • Prometheus
    • EVM Extensions
      • Making eth_calls
    • Getting Started Using Rust and Protobuf
      • Rust
        • Option struct
        • Result struct
      • Protobuf Schemas
    • From Yellowstone to Substreams
  • Reference Material
    • Chains and endpoints
      • Ethereum Data Model
    • Never Miss Data
    • Development Container Reference
    • Substreams CLI
      • Install the CLI
      • Authentication
      • Substreams CLI reference
    • Substreams Components
      • Packages
      • Modules
        • Module types
        • Inputs
        • Output
        • Module handlers
        • Module handler creation
        • Indexes
        • Keys in stores
        • Dynamic data sources
        • Aggregation Windows
        • Parameterized Modules
      • Manifests Reference
    • Substreams Architecture
    • Graph-Node
      • Local Development
      • Publish to The Graph Network
    • Indexer Reference
      • Test Substreams Locally
    • Logging, Debugging & Testing
    • Change log
    • FAQ
  • Decentralized Indexing
    • What is The Graph?
Powered by GitBook
On this page
  • Purpose
  • Overview
  • Requirements
  • Installation
  • Instrumenting your Substreams
  • Assumptions
  • Import the Cargo module
  • Test your substreams
  • Consume the key-value data from a web-page using Connect-Web
  • Requirements
  • Start from our example for substreams-eth-block-meta
  • Bootstrap your own application
  • Sending to a production key-value store
  • Conclusion and review

Was this helpful?

Edit on GitHub
  1. How-To Guides
  2. Using a Substreams Sink
  3. Community Sinks

Key-Value Store

PreviousFilesNextPrometheus

Last updated 5 months ago

Was this helpful?

Purpose

This documentation will assist you in using to write data from your existing substreams into a key-value store and serve it back through Connect-Web/GRPC.

Overview

substreams-sink-kv works by reading the output of specially-designed substreams module (usually called kv_out) that produces data in a protobuf-encoded structure called sf.substreams.sink.kv.v1.KVOperations.

The data is written to a key-value store. Currently supported KV store are Badger, BigTable and TiKV.

A Connect-Web interface makes the data available directly from the substreams-sink-kv process. Alternatively, you can consume the data directly from your key-value store.

Requirements

  • An existing substreams (including substreams.yaml and Rust code) that you want to instrument for substreams-sink-kv.

  • A key-value store where you want to send your data (a badger local file can be used for development)

  • Knowledge about Substreams development (start )

  • Rust installation and compiler

Installation

  • Install

  • Install

  • Install to easily read the data back from the KV store

Instrumenting your Substreams

Assumptions

# substreams.yaml
...
  - name: store_block_meta_end
    kind: store
    updatePolicy: set
    valueType: proto:eth.block_meta.v1.BlockMeta
    inputs:
      - source: sf.ethereum.type.v2.Block
message BlockMeta {
  uint64 number = 1;
  bytes hash = 2;
  bytes parent_hash = 3;
  google.protobuf.Timestamp timestamp = 4;
}

Import the Cargo module

  1. Add the substreams-sink-kv crate to your Cargo.toml:

# Cargo.toml

[dependencies]
substreams-sink-kv = "0.1.1"
# ...
  1. Add map module implementation function named kv_out to your src/lib.rs:

# substreams.yaml
...
  - name: kv_out
    kind: map
    inputs:
      - store: store_block_meta_end
        mode: deltas
    output:
      type: proto:sf.substreams.sink.kv.v1.KVOperations
  1. Add a kv_out public function to your src/lib.rs:

// src/lib.rs

#[path = "kv_out.rs"]
mod kv;
use substreams_sink_kv::pb::kv::KvOperations;

#[substreams::handlers::map]
pub fn kv_out(
    deltas: store::Deltas<DeltaProto<BlockMeta>>,
) -> Result<KvOperations, Error> {

    // Create an empty 'KvOperations' structure
    let mut kv_ops: KvOperations = Default::default();

    // Call a function that will push key-value operations from the deltas
    kv::process_deltas(&mut kv_ops, deltas);

    // Here, we could add more operations to the kv_ops
    // ...

    Ok(kv_ops)
}
  1. Add the kv::process_deltas transformation function referenced in the last snippet:

// src/kv_out.rs

use substreams::proto;
use substreams::store::{self, DeltaProto};
use substreams_sink_kv::pb::kv::KvOperations;

use crate::pb::block_meta::BlockMeta;

pub fn process_deltas(ops: &mut KvOperations, deltas: store::Deltas<DeltaProto<BlockMeta>>) {
    use substreams::pb::substreams::store_delta::Operation;

    for delta in deltas.deltas {
        match delta.operation {
            // KV Operations do not distinguish between Create and Update.
            Operation::Create | Operation::Update => {
                let val = proto::encode(&delta.new_value).unwrap();
                ops.push_new(delta.key, val, delta.ordinal);
            }
            Operation::Delete => ops.push_delete(&delta.key, delta.ordinal),
            x => panic!("unsupported operation {:?}", x),
        }
    }
}

Test your substreams

  1. Compile your changes in your rust code:

cargo build --release --target=wasm32-unknown-unknown
  1. Run with substreams command directly:

substreams run -e mainnet.eth.streamingfast.io:443 substreams.yaml kv_out --start-block 1000000 --stop-block +1
  1. Run with substreams-sink-kv:

substreams-sink-kv \
  run \
  "badger3://$(pwd)/badger_data.db" \
  mainnet.eth.streamingfast.io:443 \
  manifest.yaml \
  kv_out

You should see output similar to this one:

2023-01-12T10:08:31.803-0500 INFO (sink-kv) starting prometheus metrics server {"listen_addr": "localhost:9102"}
2023-01-12T10:08:31.803-0500 INFO (sink-kv) sink to kv {"dsn": "badger3:///Users/stepd/repos/substreams-sink-kv/badger_data.db", "endpoint": "mainnet.eth.streamingfast.io:443", "manifest_path": "https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.4.0/substreams-eth-block-meta-v0.4.0.spkg", "output_module_name": "kv_out", "block_range": ""}
2023-01-12T10:08:31.803-0500 INFO (sink-kv) starting pprof server {"listen_addr": "localhost:6060"}
2023-01-12T10:08:31.826-0500 INFO (sink-kv) reading substreams manifest {"manifest_path": "https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.4.0/substreams-eth-block-meta-v0.4.0.spkg"}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) validating output store {"output_store": "kv_out"}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) resolved block range {"start_block": 0, "stop_block": 0}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) starting to listen on {"addr": "localhost:8000"}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) starting stats service {"runs_each": "2s"}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) no block data buffer provided. since undo steps are possible, using default buffer size {"size": 12}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) starting stats service {"runs_each": "2s"}
2023-01-12T10:08:32.186-0500 INFO (sink-kv) ready, waiting for signal to quit
2023-01-12T10:08:32.186-0500 INFO (sink-kv) launching server {"listen_addr": "localhost:8000"}
2023-01-12T10:08:32.187-0500 INFO (sink-kv) serving plaintext {"listen_addr": "localhost:8000"}
2023-01-12T10:08:32.278-0500 INFO (sink-kv) session init {"trace_id": "a3c59bd7992c433402b70f9541565d2d"}
2023-01-12T10:08:34.186-0500 INFO (sink-kv) substreams sink stats {"db_flush_rate": "10.500 flush/s (21 total)", "data_msg_rate": "0.000 msg/s (0 total)", "progress_msg_rate": "0.000 msg/s (0 total)", "block_rate": "0.000 blocks/s (0 total)", "flushed_entries": 0, "last_block": "None"}
2023-01-12T10:08:34.186-0500 INFO (sink-kv) substreams sink stats {"progress_msg_rate": "16551.500 msg/s (33103 total)", "block_rate": "10941.500 blocks/s (21883 total)", "last_block": "#291883 (66d03f819dde948b297c8d582889246d7ba11a5b947335497f8716a7b608f78e)"}

Note This writes the data to a local folder "./badger_data.db/" in Badger format. You can rm -rf ./badger_data.db between your tests to cleanup all existing data.

  1. Look at the stored data

You can scan the whole dataset using the 'Scan' command:

grpcurl --plaintext -d '{"begin": "", "limit":100}' localhost:8000 sf.substreams.sink.kv.v1.Kv/Scan

You can look at data by key prefix:

grpcurl --plaintext   -d '{"prefix": "day:first:201511", "limit":31}' localhost:8000 sf.substreams.sink.kv.v1.Kv/GetByPrefix

Consume the key-value data from a web-page using Connect-Web

Requirements

Start from our example for substreams-eth-block-meta

You can checkout and run our connect-web-example like this:

git clone git@github.com:streamingfast/substreams-sink-kv
cd substreams-sink-kv/connect-web-example
npm install
npm run dev

Then, enter a key in the text box. The app currently only decodes eth.block_meta.v1.BlockMeta, so you will likely receive the corresponding value encoded in hex string.

To decode the value of your own data structures, add your .proto files under proto/ and generate Rust bindings like this:

npm run buf:generate

You should see this output:

> connect-web-example@0.0.0 buf:generate
> buf generate ../proto/substreams/sink/kv/v1 && buf generate ./proto

Then, modify the code from src/App.tsx to decode your custom type, from this:

    import { BlockMeta } from "../gen/block_meta_pb";

    ...

    const blkmeta = BlockMeta.fromBinary(response.value);
    output = JSON.stringify(blkmeta, (key, value) => {
        if (key === "hash") {
            return "0x" + bufferToHex(blkmeta.hash);
        }
        if (key === "parentHash") {
            return "0x" + bufferToHex(blkmeta.parentHash);
        }
        return value;
    }, 2);

to this:

    import { MyData } from "../gen/my_data_pb";

    ...

    const decoded = MyData.fromBinary(response.value);
    output = JSON.stringify(decoded, null, 2);

Bootstrap your own application

Sending to a production key-value store

Until now, we've used the badger database as a store, for simplicity. However, substreams-sink-kv also supports TiKV and bigtable.

  • tikv://pd0,pd1,pd2:2379?prefix=namespace_prefix

  • bigkv://project.instance/namespace-prefix?createTables=true

Conclusion and review

The following instructions will assume that you are instrumenting , which contains:

A store store_block_meta_end defined like :

a eth.block_meta.v1.BlockMeta protobuf structure like :

Note The is already instrumented for sink-kv, the proposed changes here are a simplified version of what has been implemented. Please adjust the proposed code to your own substreams.

Note To connect to a public StreamingFast substreams endpoint, you will need an authentication token, follow this to obtain one.

The library allows you to quickly bootstrap a web-based client for your key-value store.

If you want to start with an empty application, you can follow

See for more details.

The ability to route data extracted from the blockchain by using Substreams is powerful and useful. Key-value stores aren't the only type of sink the data extracted by Substreams can be piped into. Review the core Substreams sinks documentation for and sinking strategies.

substreams-sink-kv
here
substreams-sink-kv CLI
substreams CLI
grpcurl
substreams-eth-block-meta
this
this
substreams-eth-block-meta
guide
Connect-Web
NodeJS
npm
buf CLI
these instructions
kvdb
additional information on other types of sinks