The Substreams JavaScript library enables you to run a Substreams, just like you would through the CLI, but using JavaScript.
The library works both on the client-side and on the server-side, but with some small differences. Clone the Substreams Sink Examples repository contains examples several programming language. Then, move to the javascript folder.
Depending on your needs, you can use the node directory (which contains an example using server-side NodeJS) or the web directory (which contains an example using client-side JavaScript).
Install the dependencies
The package.json contains all the necessary dependencies to run the application.
The NodeJS example uses @connectrpc/connect-node, while the Web example uses @connectrpc/connect-web.
The Web example uses ViteJS to create a development server that runs the application:
npmrundev
Then, you can navigate to https://localhost:5173. You will start receiving data!
Explore the Application
When you consume a Substreams package, a long-live gRPC connection is established, therefore, disconnections will happen and should be taken as normal. The Substreams keeps track of latest block you consumed by sending a cursor to your application. You must persist the cursor, so that in the case of a disconnection, you can restart the application from the latest consumed block.
The index.js file contains the main() function, which runs an infite loop and takes care of managing the disconnections.
const TOKEN = process.env.SUBSTREAMS_API_TOKEN // Substreams token. By default it takes the SUBSTREAMS_API_TOKEN environment variable of your system
constENDPOINT="https://mainnet.eth.streamingfast.io"// Substreams endpont. In this case, Ethereum mainnetconst SPKG = "https://spkg.io/streamingfast/ethereum-explorer-v0.1.2.spkg" // Substreams package. In this case, taken from the substreams.dev registry
constMODULE="map_block_meta"constSTART_BLOCK='100000'constSTOP_BLOCK='+10000'/* Entrypoint of the application. Because of the long-running connection, Substreams will disconnect from time to time. The application MUST handle disconnections and commit the provided cursor to avoid missing information.*/constmain=async () => {constpkg=awaitfetchPackage() // Download spkgconstregistry=createRegistry(pkg);// Create gRPC connectionconsttransport=createConnectTransport({ baseUrl:ENDPOINT, interceptors: [createAuthInterceptor(TOKEN)], useBinaryFormat:true, jsonOptions: { typeRegistry: registry, }, }); // The infite loop handles disconnections. Every time an disconnection error is thrown, the loop will automatically reconnect
// and start consuming from the latest commited cursor.while (true) {try {awaitstream(pkg, registry, transport); } catch (e) {if (!isErrorRetryable(e)) {console.log(`A fatal error occurred: ${e}`)throw e }console.log(`A retryable error occurred (${e}), retrying after backoff`)console.log(e)// Add backoff from a an easy to use library } }}
The main.js file contains the main() function, which runs an infite loop and takes care of managing the disconnections.
constTOKEN="<SUBTREAMS-TOKEN>"// Substreams token. Put here your Substreams API token.constENDPOINT="https://mainnet.eth.streamingfast.io"// Substreams endpont. In this case, Ethereum mainnetconst SPKG = "https://spkg.io/streamingfast/ethereum-explorer-v0.1.2.spkg" // Substreams package. In this case, taken from the substreams.dev registry
constMODULE="map_block_meta"constSTART_BLOCK='100000'constSTOP_BLOCK='+10000'/* Entrypoint of the application. Because of the long-running connection, Substreams will disconnect from time to time. The application MUST handle disconnections and commit the provided cursor to avoid missing information.*/constmain=async () => {constpkg=awaitfetchPackage(); // Download spkgconstregistry=createRegistry(pkg);consttransport=createConnectTransport({ baseUrl:ENDPOINT, interceptors: [createAuthInterceptor(TOKEN)], useBinaryFormat:true, jsonOptions: { typeRegistry: registry, }, }); // The infite loop handles disconnections. Every time an disconnection error is thrown, the loop will automatically reconnect
// and start consuming from the latest commited cursor.while (true) {try {awaitstream(pkg, registry, transport); } catch (e) {if (!isErrorRetryable(e)) {console.log(`A fatal error occurred: ${e}`)throw e }console.log(`A retryable error occurred (${e}), retrying after backoff`)console.log(e) } }}
The stream() function establishes the actual streaming connection by calling the streamBlocks function. The response of the function is a StatefulResponse object, which contains a progress message (containing useful information about the Substreams execution. The handleProgressMessage() function handles this message) and a response message (containing the message sent from the server. The handleResponseMessage() function decodes this message).
conststream=async (pkg, registry, transport) => {constrequest=createRequest({ substreamPackage: pkg, outputModule:MODULE, productionMode:true, startBlockNum:START_BLOCK, stopBlockNum:STOP_BLOCK, startCursor:getCursor() ??undefined });// Stream the blocksforawait (conststatefulResponseofstreamBlocks(transport, request)) {/* Decode the response and handle the message. There different types of response messages that you can receive. You can read more about the response message in the docs:
https://substreams.streamingfast.io/documentation/consume/reliability-guarantees#the-response-format */awaithandleResponseMessage(statefulResponse.response, registry);/* Handle the progress message. Regardless of the response message, the progress message is always sent, and gives you useful information about the execution of the Substreams.
*/handleProgressMessage(statefulResponse.progress, registry); }}
There are different kind of response messages that the server can send. The most common are ones blockScopedData and blockUndoSignal:
blockScopedData: sent by the server whenever a new block is discovered in the blockchain. Contains all the block information that you can decode.
blockUndoSignal: sent every time there is a fork in the blockchain. Because you have probably read incorrect blocks in the blockScopedData message, you must rewind back to the latest valid block.