Subscriptions
This document covers all subscription methods and filtering options available in ThorPulse.
Simple Subscriptions
Simple subscriptions connect directly to NATS topic patterns. They're easy to use but don't support server-side filtering.
Slots
// All slot updates
slots := client.SubscribeSlotsChannel(ctx)
// Confirmed slots only
slots := client.SubscribeSlotsConfirmedChannel(ctx)Topic patterns:
slots.>- All slot eventsslots.*.confirmed- Confirmed slots onlyslots.*.finalized- Finalized slots onlyslots.{slot_number}.{status}- Specific slot
Transactions
// All transactions (high volume!)
txs := client.SubscribeAllTransactionsChannel(ctx)
// By program
txs := client.SubscribeProgramChannel(ctx, "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8")
// By account
txs := client.SubscribeAccountTransactionsChannel(ctx, pubkey)Topic patterns:
txs.slot.>- All transactionstxs.program.{program_id}.>- By programtxs.account.{pubkey}.>- By accounttxs.sig.{signature}.{slot}- Specific transaction
Entries
entries := client.SubscribeEntriesChannel(ctx)Topic patterns:
entries.>- All entriesentries.{slot}.{index}- Specific entry
Accounts
// All accounts (very high volume!)
accounts := client.SubscribeAccountsChannel(ctx)Topic patterns:
accounts.>- All account updatesaccounts.pubkey.{pubkey}.{slot}- Specific accountaccounts.owner.{program}.{slot}- By owner program
SubscribeRequest (Yellowstone-Style)
SubscribeRequest enables server-side filtering, reducing bandwidth and latency. It's the recommended approach for production.
Basic Usage
request := client.NewSubscribeRequest().
AllSlots("my-slots").
Transactions("my-txs", client.TransactionFilter{
Vote: boolPtr(false),
}).
Entries("my-entries").
Commitment(client.CommitmentConfirmed)
subscription, err := client.Subscribe(ctx, request)Filter Types
Slot Filter
request.Slots("slot-filter", client.SlotFilter{
FilterByCommitment: true, // Only at commitment level
})
// Shorthand for all slots
request.AllSlots("all-slots")Transaction Filter
request.Transactions("tx-filter", client.TransactionFilter{
Vote: boolPtr(false), // Exclude votes
Failed: boolPtr(false), // Only successful
Signature: "5UGT...", // Specific signature
AccountInclude: []string{"675k..."}, // Include if ANY match
AccountExclude: []string{"Vote..."}, // Exclude if ANY match
AccountRequired: []string{"Toke..."}, // Require ALL
})Vote
*bool
true=votes only, false=non-votes only
Failed
*bool
true=failed only, false=successful only
Signature
string
Filter for specific signature
AccountInclude
[]string
Include if ANY account present
AccountExclude
[]string
Exclude if ANY account present
AccountRequired
[]string
Require ALL accounts present
Account Filter
request.Accounts("acc-filter", client.AccountFilter{
Accounts: []string{"pubkey1", "pubkey2"}, // Specific accounts
Owners: []string{"TokenkegQf..."}, // By owner program
Filters: []client.AccountDataFilter{
{Datasize: uint64Ptr(165)}, // By data size
{Memcmp: &client.MemcmpFilter{
Offset: 0,
Bytes: []byte{1, 2, 3},
}},
{TokenAccountState: true}, // SPL tokens only
{Lamports: &client.LamportsFilter{
Eq: uint64Ptr(0), // Zero balance
}},
},
})Datasize
Match exact account data size
Memcmp
Match bytes at offset
TokenAccountState
Only valid SPL token accounts
Lamports.Eq
Exact lamport balance
Lamports.Ne
Not equal to balance
Lamports.Lt
Less than balance
Lamports.Gt
Greater than balance
Entry Filter
request.Entries("entry-filter")Entries don't have filter options - you receive all entries or none.
Block Filter
request.Blocks("block-filter", client.BlockFilter{
AccountInclude: []string{},
IncludeTransactions: true,
IncludeAccounts: false,
IncludeEntries: false,
})Block Meta Filter
request.BlocksMeta("meta-filter")Block metadata is lighter than full blocks - contains slot, blockhash, parent, rewards summary.
Commitment Levels
request.Commitment(client.CommitmentProcessed) // Fastest, may rollback
request.Commitment(client.CommitmentConfirmed) // Balanced
request.Commitment(client.CommitmentFinalized) // Slowest, irreversibleProcessed
Transaction processed by leader
Real-time display
Confirmed
Confirmed by supermajority
Trading, analytics
Finalized
Guaranteed irreversible
Settlement, accounting
Handling Updates
Go Handler Pattern
err := subscription.Run(ctx, func(update *proto.SubscribeUpdate) error {
switch u := update.UpdateOneof.(type) {
case *proto.SubscribeUpdate_Slot:
fmt.Printf("Slot: %d\n", u.Slot.Slot)
case *proto.SubscribeUpdate_Transaction:
fmt.Printf("TX: %x\n", u.Transaction.Transaction.Signature[:8])
case *proto.SubscribeUpdate_Account:
fmt.Printf("Account: %x\n", u.Account.Account.Pubkey[:8])
case *proto.SubscribeUpdate_Entry:
fmt.Printf("Entry: slot %d index %d\n", u.Entry.Slot, u.Entry.Index)
case *proto.SubscribeUpdate_Block:
fmt.Printf("Block: %d\n", u.Block.Slot)
case *proto.SubscribeUpdate_Pong:
// Keep-alive, ignore
}
return nil
})Rust Stream Pattern
use futures::StreamExt;
use thorpulse_client::proto::subscribe_update::UpdateOneof;
while let Some(result) = subscription.next().await {
let update = result?;
match update.update_oneof {
Some(UpdateOneof::Slot(slot)) => {
println!("Slot: {}", slot.slot);
}
Some(UpdateOneof::Transaction(tx)) => {
println!("TX in slot: {}", tx.slot);
}
Some(UpdateOneof::Account(acc)) => {
println!("Account in slot: {}", acc.slot);
}
Some(UpdateOneof::Entry(entry)) => {
println!("Entry: {} / {}", entry.slot, entry.index);
}
Some(UpdateOneof::Pong(_)) => {
// Keep-alive
}
_ => {}
}
}Performance Considerations
Subscription Limits
Each tier has subscription limits:
Flash: 100 concurrent subscriptions
Thor: 250 concurrent subscriptions
Prime: Unlimited
Topic Selection
Start specific, widen if needed:
Buffer Sizing
For high-volume subscriptions:
// Go: Use buffered channels
slots := make(chan *proto.SubscribeUpdateSlot, 1000)// Rust: Process quickly or buffer
let (tx, rx) = tokio::sync::mpsc::channel(1000);Backpressure
If you can't keep up:
Common Patterns
DEX Trading
request := client.NewSubscribeRequest().
AllSlots("slots").
Transactions("raydium", client.TransactionFilter{
Vote: boolPtr(false),
Failed: boolPtr(false),
AccountInclude: []string{
"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", // Raydium AMM
},
}).
Commitment(client.CommitmentConfirmed)Token Monitoring
request := client.NewSubscribeRequest().
Accounts("tokens", client.AccountFilter{
Owners: []string{"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"},
Filters: []client.AccountDataFilter{
{Datasize: uint64Ptr(165)}, // Token account size
},
}).
Commitment(client.CommitmentConfirmed)Wallet Tracking
wallets := []string{"wallet1...", "wallet2...", "wallet3..."}
request := client.NewSubscribeRequest().
Accounts("wallets", client.AccountFilter{
Accounts: wallets,
}).
Transactions("wallet-txs", client.TransactionFilter{
AccountInclude: wallets,
})Last updated