262 lines
5 KiB
Python
262 lines
5 KiB
Python
"""This module contains code for parsing RPC responses."""
|
|
from dataclasses import dataclass, field
|
|
from typing import Union, Tuple, Any, Dict, List, Optional, Literal
|
|
|
|
from apischema import alias
|
|
from apischema.conversions import as_str
|
|
|
|
|
|
from solana.publickey import PublicKey
|
|
from solana.transaction import TransactionSignature
|
|
|
|
as_str(PublicKey)
|
|
|
|
TransactionErrorResult = Optional[dict]
|
|
|
|
|
|
@dataclass
|
|
class TransactionErr:
|
|
"""Container for possible transaction errors."""
|
|
|
|
err: TransactionErrorResult
|
|
|
|
|
|
@dataclass
|
|
class Context:
|
|
"""RPC result context."""
|
|
|
|
slot: int
|
|
|
|
|
|
@dataclass
|
|
class WithContext:
|
|
"""Base class for RPC result including context."""
|
|
|
|
context: Context
|
|
|
|
|
|
@dataclass
|
|
class AccountInfo:
|
|
"""Account information."""
|
|
|
|
lamports: int
|
|
owner: PublicKey
|
|
data: Union[Literal[""], Tuple[str, str], Dict[str, Any]]
|
|
executable: bool
|
|
rent_epoch: int = field(metadata=alias("rentEpoch"))
|
|
|
|
|
|
@dataclass
|
|
class AccountInfoAndContext(WithContext):
|
|
"""Account info and RPC result context."""
|
|
|
|
value: AccountInfo
|
|
|
|
|
|
@dataclass
|
|
class SubscriptionNotificationBase:
|
|
"""Base class for RPC subscription notifications."""
|
|
|
|
subscription: int
|
|
result: Any
|
|
|
|
|
|
@dataclass
|
|
class AccountNotification(SubscriptionNotificationBase):
|
|
"""Account subscription notification."""
|
|
|
|
result: AccountInfoAndContext
|
|
|
|
|
|
@dataclass
|
|
class LogItem(TransactionErr):
|
|
"""Container for logs from logSubscribe."""
|
|
|
|
signature: TransactionSignature
|
|
logs: Optional[List[str]]
|
|
|
|
|
|
@dataclass
|
|
class LogItemAndContext(WithContext):
|
|
"""Log item with RPC result context."""
|
|
|
|
value: LogItem
|
|
|
|
|
|
@dataclass
|
|
class LogsNotification(SubscriptionNotificationBase):
|
|
"""Logs subscription notification."""
|
|
|
|
result: LogItemAndContext
|
|
|
|
|
|
@dataclass
|
|
class ProgramAccount:
|
|
"""Program account pubkey and account info."""
|
|
|
|
pubkey: PublicKey
|
|
account: AccountInfo
|
|
|
|
|
|
@dataclass
|
|
class ProgramAccountAndContext(WithContext):
|
|
"""Program subscription data with RPC result context."""
|
|
|
|
value: ProgramAccount
|
|
|
|
|
|
@dataclass
|
|
class ProgramNotification(SubscriptionNotificationBase):
|
|
"""Program subscription notification."""
|
|
|
|
result: ProgramAccountAndContext
|
|
|
|
|
|
@dataclass
|
|
class SignatureErrAndContext(WithContext):
|
|
"""Signature subscription error info with RPC result context."""
|
|
|
|
value: TransactionErr
|
|
|
|
|
|
@dataclass
|
|
class SignatureNotification(SubscriptionNotificationBase):
|
|
"""Signature subscription notification."""
|
|
|
|
result: SignatureErrAndContext
|
|
|
|
|
|
@dataclass
|
|
class SlotBase:
|
|
"""Base class for slot container."""
|
|
|
|
slot: int
|
|
|
|
|
|
@dataclass
|
|
class SlotInfo(SlotBase):
|
|
"""Slot info."""
|
|
|
|
parent: int
|
|
root: int
|
|
|
|
|
|
@dataclass
|
|
class SlotNotification(SubscriptionNotificationBase):
|
|
"""Slot subscription notification."""
|
|
|
|
result: SlotInfo
|
|
|
|
|
|
@dataclass
|
|
class RootNotification(SubscriptionNotificationBase):
|
|
"""Root subscription notification."""
|
|
|
|
result: int
|
|
|
|
|
|
@dataclass
|
|
class SlotAndTimestampBase(SlotBase):
|
|
"""Base class for a slot with timestamp."""
|
|
|
|
timestamp: int
|
|
|
|
|
|
@dataclass
|
|
class FirstShredReceived(SlotAndTimestampBase):
|
|
"""First shread received update."""
|
|
|
|
type: Literal["firstShredReceived"]
|
|
|
|
|
|
@dataclass
|
|
class Completed(SlotAndTimestampBase):
|
|
"""Slot completed update."""
|
|
|
|
type: Literal["completed"]
|
|
|
|
|
|
@dataclass
|
|
class CreatedBank(SlotAndTimestampBase):
|
|
"""Created bank update."""
|
|
|
|
parent: int
|
|
type: Literal["createdBank"]
|
|
|
|
|
|
@dataclass
|
|
class SlotTransactionStats:
|
|
"""Slot transaction stats."""
|
|
|
|
num_transaction_entries: int = field(metadata=alias("numTransactionEntries"))
|
|
num_successful_transactions: int = field(metadata=alias("numSuccessfulTransactions"))
|
|
num_failed_transactions: int = field(metadata=alias("numFailedTransactions"))
|
|
max_transactions_per_entry: int = field(metadata=alias("maxTransactionsPerEntry"))
|
|
|
|
|
|
@dataclass
|
|
class Frozen(SlotAndTimestampBase):
|
|
"""Slot frozen update."""
|
|
|
|
stats: SlotTransactionStats
|
|
type: Literal["frozen"]
|
|
|
|
|
|
@dataclass
|
|
class Dead(SlotAndTimestampBase):
|
|
"""Dead slot update."""
|
|
|
|
err: str
|
|
type: Literal["dead"]
|
|
|
|
|
|
@dataclass
|
|
class OptimisticConfirmation(SlotAndTimestampBase):
|
|
"""Optimistic confirmation update."""
|
|
|
|
type: Literal["optimisticConfirmation"]
|
|
|
|
|
|
@dataclass
|
|
class Root(SlotAndTimestampBase):
|
|
"""Root update."""
|
|
|
|
type: Literal["root"]
|
|
|
|
|
|
SlotsUpdatesItem = Union[FirstShredReceived, Completed, CreatedBank, Frozen, Dead, OptimisticConfirmation, Root]
|
|
|
|
|
|
@dataclass
|
|
class SlotsUpdatesNotification(SubscriptionNotificationBase):
|
|
"""Slots updates notification."""
|
|
|
|
result: SlotsUpdatesItem
|
|
|
|
|
|
@dataclass
|
|
class VoteItem:
|
|
"""Vote data."""
|
|
|
|
hash: str
|
|
slots: List[int]
|
|
timestamp: Optional[int]
|
|
|
|
|
|
@dataclass
|
|
class VoteNotification(SubscriptionNotificationBase):
|
|
"""Vote update notification."""
|
|
|
|
result: VoteItem
|
|
|
|
|
|
SubscriptionNotification = Union[
|
|
AccountNotification,
|
|
LogsNotification,
|
|
ProgramNotification,
|
|
SignatureNotification,
|
|
SlotNotification,
|
|
RootNotification,
|
|
SlotsUpdatesNotification,
|
|
VoteNotification,
|
|
]
|