Go
The Substreams Go Sink library allows to you to programmatically stream a Substreams using the Go programming language. The library handles reconnections and provides best practices for error handling.
The Substreams Sink Examples GitHub repository contains an example that you can use as the starting point to build your custom sink logic. After cloning the repository, move to the go directory.
Run the Program
This example is built in the form of a CLI by using the cobra library. You can run the program by running the following command structure:
go run . sink <ENDPOINT> <SPKG> <MODULE_NAME>In the following command, go run . is used to execute the main.go file. The mainnet.eth.streamingfast.io:443 https://spkg.io/streamingfast/substreams-eth-block-meta-v0.4.3.spkg db_out part of the command are useful parameters passed to the Go program (separated by a whitespace).
In the parameters, you pass the Substreams endpoint, the package, and the module to execute.
go run . sink mainnet.eth.streamingfast.io:443 https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg db_outInspect the Code
The example contains code comments, which are very useful to understand and adjust the code to your logic needs. Let's inspect the most important parts of the code:
var expectedOutputModuleType = string(new(pbchanges.DatabaseChanges).ProtoReflect().Descriptor().FullName()) // 1.
// ...code omitted...
func main() {
logging.InstantiateLoggers()
Run(
"sinker",
"Simple Go sinker sinking data to your terminal",
Command(sinkRunE,
"sink <endpoint> <manifest> [<output_module>]",
"Run the sinker code",
RangeArgs(2, 3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)
}),
),
OnCommandErrorLogAndExit(zlog),
)
}Create a new sink object from the parameters passed to the program:
It is necessary to handle two kind of Substreams response messages:
blockScopedData: sent by the server whenever a new block is discovered in the blockchain. Contains all the block information that you can decode.blockUndoSignal: sent every time there is a fork in the blockchain. Because you have probably read incorrect blocks in theblockScopedDatamessage, you must rewind back to the latest valid block.
When you run the sinker, you pass two different functions to handle these messages:
Last updated
Was this helpful?

