tokencrawler/.venv/lib/python3.9/site-packages/solana/transaction.py
2022-03-17 22:16:30 +01:00

461 lines
19 KiB
Python

"""Library to package an atomic sequence of instructions to a transaction."""
from __future__ import annotations
from dataclasses import dataclass
from sys import maxsize
from typing import Any, Dict, List, NamedTuple, NewType, Optional, Union
from based58 import b58decode, b58encode
from nacl.exceptions import BadSignatureError # type: ignore
from nacl.signing import VerifyKey # type: ignore
from solana.blockhash import Blockhash
from solana.keypair import Keypair
from solana.message import CompiledInstruction, Message, MessageArgs, MessageHeader
from solana.publickey import PublicKey
from solana.utils import shortvec_encoding as shortvec
TransactionSignature = NewType("TransactionSignature", str)
"""Type for TransactionSignature."""
PACKET_DATA_SIZE = 1280 - 40 - 8
"""Constant for maximum over-the-wire size of a Transaction."""
SIG_LENGTH = 64
"""Constant for standard length of a signature."""
@dataclass
class AccountMeta:
"""Account metadata dataclass."""
pubkey: PublicKey
"""An account's public key."""
is_signer: bool
"""True if an instruction requires a transaction signature matching `pubkey`"""
is_writable: bool
"""True if the `pubkey` can be loaded as a read-write account."""
class TransactionInstruction(NamedTuple):
"""Transaction Instruction class."""
keys: List[AccountMeta]
"""Public keys to include in this transaction Boolean represents whether this
pubkey needs to sign the transaction.
"""
program_id: PublicKey
"""Program Id to execute."""
data: bytes = bytes(0)
"""Program input."""
class NonceInformation(NamedTuple):
"""NonceInformation to be used to build a Transaction."""
nonce: Blockhash
"""The current Nonce blockhash."""
nonce_instruction: TransactionInstruction
"""AdvanceNonceAccount Instruction."""
@dataclass
class SigPubkeyPair:
"""Pair of signature and corresponding public key."""
pubkey: PublicKey
signature: Optional[bytes] = None
class Transaction:
"""Transaction class to represent an atomic transaction.
Args:
recent_blockhash: A recent transaction id.
nonce_info: Nonce information.
If populated, transaction will use a durable Nonce hash instead of a `recent_blockhash`.
signatures: Signatures for the transaction.
Typically created by invoking the `sign()` method.
fee_payer: The transaction fee payer.
"""
# Default (empty) signature
__DEFAULT_SIG = bytes(64)
def __init__(
self,
recent_blockhash: Optional[Blockhash] = None,
nonce_info: Optional[NonceInformation] = None,
signatures: Optional[List[SigPubkeyPair]] = None,
fee_payer: Optional[PublicKey] = None,
) -> None:
"""Init transaction object."""
self.fee_payer = fee_payer
self.instructions: List[TransactionInstruction] = []
self.signatures: List[SigPubkeyPair] = signatures if signatures else []
self.recent_blockhash, self.nonce_info = recent_blockhash, nonce_info
def __eq__(self, other: Any) -> bool:
"""Equality defintion for Transactions."""
if not isinstance(other, Transaction):
return False
return (
self.recent_blockhash == other.recent_blockhash
and self.nonce_info == other.nonce_info
and self.signatures == other.signatures
and self.instructions == other.instructions
)
def signature(self) -> Optional[bytes]:
"""The first (payer) Transaction signature.
Returns:
The payer signature.
"""
return None if not self.signatures else self.signatures[0].signature
def add(self, *args: Union[Transaction, TransactionInstruction]) -> Transaction:
"""Add one or more instructions to this Transaction.
Args:
*args: The instructions to add to this Transaction.
If a `Transaction` is passsed, the instructions will be extracted from it.
Returns:
The transaction with the added instructions.
"""
for arg in args:
if isinstance(arg, Transaction):
self.instructions.extend(arg.instructions)
elif isinstance(arg, TransactionInstruction):
self.instructions.append(arg)
else:
raise ValueError("invalid instruction:", arg)
return self
def compile_message(self) -> Message: # pylint: disable=too-many-locals
"""Compile transaction data.
Returns:
The compiled message.
"""
if self.nonce_info and self.instructions[0] != self.nonce_info.nonce_instruction:
self.recent_blockhash = self.nonce_info.nonce
self.instructions = [self.nonce_info.nonce_instruction] + self.instructions
if not self.recent_blockhash:
raise AttributeError("transaction recentBlockhash required")
if len(self.instructions) < 1:
raise AttributeError("no instructions provided")
fee_payer = self.fee_payer
if not fee_payer and len(self.signatures) > 0 and self.signatures[0].pubkey:
# Use implicit fee payer
fee_payer = self.signatures[0].pubkey
if not fee_payer:
raise AttributeError("transaction feePayer required")
account_metas, program_ids = [], []
for instr in self.instructions:
if not instr.program_id:
raise AttributeError("invalid instruction:", instr)
account_metas.extend(instr.keys)
if str(instr.program_id) not in program_ids:
program_ids.append(str(instr.program_id))
# Append programID account metas.
for pg_id in program_ids:
account_metas.append(AccountMeta(PublicKey(pg_id), False, False))
# Sort. Prioritizing first by signer, then by writable and converting from set to list.
account_metas.sort(key=lambda account: (not account.is_signer, not account.is_writable))
# Cull duplicate accounts
fee_payer_idx = maxsize
seen: Dict[str, int] = {}
uniq_metas: List[AccountMeta] = []
for sig in self.signatures:
pubkey = str(sig.pubkey)
if pubkey in seen:
uniq_metas[seen[pubkey]].is_signer = True
else:
uniq_metas.append(AccountMeta(sig.pubkey, True, True))
seen[pubkey] = len(uniq_metas) - 1
if sig.pubkey == fee_payer:
fee_payer_idx = min(fee_payer_idx, seen[pubkey])
for a_m in account_metas:
pubkey = str(a_m.pubkey)
if pubkey in seen:
idx = seen[pubkey]
uniq_metas[idx].is_writable = uniq_metas[idx].is_writable or a_m.is_writable
else:
uniq_metas.append(a_m)
seen[pubkey] = len(uniq_metas) - 1
if a_m.pubkey == fee_payer:
fee_payer_idx = min(fee_payer_idx, seen[pubkey])
# Move fee payer to the front
if fee_payer_idx == maxsize:
uniq_metas = [AccountMeta(fee_payer, True, True)] + uniq_metas
else:
uniq_metas = (
[uniq_metas[fee_payer_idx]] + uniq_metas[:fee_payer_idx] + uniq_metas[fee_payer_idx + 1 :] # noqa: E203
)
# Split out signing from nonsigning keys and count readonlys
signed_keys: List[str] = []
unsigned_keys: List[str] = []
num_required_signatures = num_readonly_signed_accounts = num_readonly_unsigned_accounts = 0
for a_m in uniq_metas:
if a_m.is_signer:
signed_keys.append(str(a_m.pubkey))
num_required_signatures += 1
num_readonly_signed_accounts += int(not a_m.is_writable)
else:
num_readonly_unsigned_accounts += int(not a_m.is_writable)
unsigned_keys.append(str(a_m.pubkey))
# Initialize signature array, if needed
if not self.signatures:
self.signatures = [SigPubkeyPair(pubkey=PublicKey(key), signature=None) for key in signed_keys]
account_keys: List[str] = signed_keys + unsigned_keys
account_indices: Dict[str, int] = {str(key): i for i, key in enumerate(account_keys)}
compiled_instructions: List[CompiledInstruction] = [
CompiledInstruction(
accounts=[account_indices[str(a_m.pubkey)] for a_m in instr.keys],
program_id_index=account_indices[str(instr.program_id)],
data=b58encode(instr.data),
)
for instr in self.instructions
]
return Message(
MessageArgs(
header=MessageHeader(
num_required_signatures=num_required_signatures,
num_readonly_signed_accounts=num_readonly_signed_accounts,
num_readonly_unsigned_accounts=num_readonly_unsigned_accounts,
),
account_keys=account_keys,
instructions=compiled_instructions,
recent_blockhash=self.recent_blockhash,
)
)
def serialize_message(self) -> bytes:
"""Get raw transaction data that need to be covered by signatures.
Returns:
The serialized message.
"""
return self.compile_message().serialize()
def sign_partial(self, *partial_signers: Union[PublicKey, Keypair]) -> None:
"""Partially sign a Transaction with the specified accounts.
The `Keypair` inputs will be used to sign the Transaction immediately, while any
`PublicKey` inputs will be referenced in the signed Transaction but need to
be filled in later by calling `addSigner()` with the matching `Keypair`.
All the caveats from the `sign` method apply to `signPartial`
"""
def partial_signer_pubkey(account_or_pubkey: Union[PublicKey, Keypair]):
return account_or_pubkey.public_key if isinstance(account_or_pubkey, Keypair) else account_or_pubkey
signatures: List[SigPubkeyPair] = [
SigPubkeyPair(pubkey=partial_signer_pubkey(partial_signer)) for partial_signer in partial_signers
]
self.signatures = signatures
sign_data = self.serialize_message()
for idx, partial_signer in enumerate(partial_signers):
if isinstance(partial_signer, Keypair):
sig = partial_signer.sign(sign_data).signature
if len(sig) != SIG_LENGTH:
raise RuntimeError("signature has invalid length", sig)
self.signatures[idx].signature = sig
def sign(self, *signers: Keypair) -> None:
"""Sign the Transaction with the specified accounts.
Multiple signatures may be applied to a Transaction. The first signature
is considered "primary" and is used when testing for Transaction confirmation.
Transaction fields should not be modified after the first call to `sign`,
as doing so may invalidate the signature and cause the Transaction to be
rejected.
The Transaction must be assigned a valid `recentBlockhash` before invoking this method.
"""
self.sign_partial(*signers)
def add_signature(self, pubkey: PublicKey, signature: bytes) -> None:
"""Add an externally created signature to a transaction."""
if len(signature) != SIG_LENGTH:
raise ValueError("signature has invalid length", signature)
idx = next((i for i, sig_pair in enumerate(self.signatures) if sig_pair.pubkey == pubkey), None)
if idx is None:
raise ValueError("unknown signer: ", str(pubkey))
self.signatures[idx].signature = signature
def add_signer(self, signer: Keypair) -> None:
"""Fill in a signature for a partially signed Transaction.
The `signer` must be the corresponding `Keypair` for a `PublicKey` that was
previously provided to `signPartial`
"""
signed_msg = signer.sign(self.serialize_message())
self.add_signature(signer.public_key, signed_msg.signature)
def verify_signatures(self) -> bool:
"""Verify signatures of a complete, signed Transaction.
Returns:
a bool indicating if the signatures are correct or not.
"""
return self.__verify_signatures(self.serialize_message())
def __verify_signatures(self, signed_data: bytes) -> bool:
for sig_pair in self.signatures:
if not sig_pair.signature:
return False
try:
VerifyKey(bytes(sig_pair.pubkey)).verify(signed_data, sig_pair.signature)
except BadSignatureError:
return False
return True
def serialize(self) -> bytes:
"""Serialize the Transaction in the wire format.
The Transaction must have a valid `signature` before invoking this method.
Example:
>>> from solana.keypair import Keypair
>>> from solana.blockhash import Blockhash
>>> from solana.publickey import PublicKey
>>> from solana.system_program import transfer, TransferParams
>>> seed = bytes(PublicKey(1))
>>> sender, receiver = Keypair.from_seed(seed), PublicKey(2)
>>> transfer_tx = Transaction().add(transfer(TransferParams(from_pubkey=sender.public_key, to_pubkey=receiver, lamports=1000)))
>>> transfer_tx.recent_blockhash = Blockhash(str(PublicKey(3)))
>>> transfer_tx.sign(sender)
>>> transfer_tx.serialize().hex()
'019d53be8af3a7c30f86c1092d2c3ea61d270c0cfa275a23ba504674c8fbbb724827b23b42dc8e08019e23120f1b6f40f9799355ce54185b4415be37ca2cee6e0e010001034cb5abf6ad79fbf5abbccafcc269d85cd2651ed4b885b5869f241aedf0a5ba2900000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000301020200010c02000000e803000000000000'
Returns:
The serialized transaction.
""" # noqa: E501 pylint: disable=line-too-long
if not self.signatures:
raise AttributeError("transaction has not been signed")
sign_data = self.serialize_message()
if not self.__verify_signatures(sign_data):
raise AttributeError("transaction has not been signed correctly")
return self.__serialize(sign_data)
def __serialize(self, signed_data: bytes) -> bytes:
if len(self.signatures) >= SIG_LENGTH * 4:
raise AttributeError("too many signatures to encode")
wire_transaction = bytearray()
# Encode signature count
signature_count = shortvec.encode_length(len(self.signatures))
wire_transaction.extend(signature_count)
# Encode signatures
for sig_pair in self.signatures:
if sig_pair.signature and len(sig_pair.signature) != SIG_LENGTH:
raise RuntimeError("signature has invalid length", sig_pair.signature)
if not sig_pair.signature:
wire_transaction.extend(bytearray(SIG_LENGTH))
else:
wire_transaction.extend(sig_pair.signature)
# Encode signed data
wire_transaction.extend(signed_data)
if len(wire_transaction) > PACKET_DATA_SIZE:
raise RuntimeError(f"transaction too large: {len(wire_transaction)} > {PACKET_DATA_SIZE}")
return bytes(wire_transaction)
@staticmethod
def deserialize(raw_transaction: bytes) -> Transaction:
"""Parse a wire transaction into a Transaction object.
Example:
>>> raw_transaction = bytes.fromhex(
... '019d53be8af3a7c30f86c1092d2c3ea61d270c0cfa2'
... '75a23ba504674c8fbbb724827b23b42dc8e08019e23'
... '120f1b6f40f9799355ce54185b4415be37ca2cee6e0'
... 'e010001034cb5abf6ad79fbf5abbccafcc269d85cd2'
... '651ed4b885b5869f241aedf0a5ba290000000000000'
... '0000000000000000000000000000000000000000000'
... '0000000200000000000000000000000000000000000'
... '0000000000000000000000000000000000000000000'
... '0000000000000000000000000000000000000000000'
... '000000301020200010c02000000e803000000000000'
... )
>>> type(Transaction.deserialize(raw_transaction))
<class 'solana.transaction.Transaction'>
Returns:
The deserialized transaction.
"""
signatures = []
signature_count, offset = shortvec.decode_length(raw_transaction)
for _ in range(signature_count):
signatures.append(b58encode(raw_transaction[offset : offset + SIG_LENGTH])) # noqa: E203
offset += SIG_LENGTH
return Transaction.populate(Message.deserialize(raw_transaction[offset:]), signatures)
@staticmethod
def populate(message: Message, signatures: List[bytes]) -> Transaction:
"""Populate Transaction object from message and signatures.
Example:
>>> raw_message = bytes.fromhex(
... '0200030500000000000000000000000000000000000000000000'
... '0000000000000000000100000000000000000000000000000000'
... '0000000000000000000000000000000200000000000000000000'
... '0000000000000000000000000000000000000000000300000000'
... '0000000000000000000000000000000000000000000000000000'
... '0004000000000000000000000000000000000000000000000000'
... '0000000000000005c49ae77603782054f17a9decea43b444eba0'
... 'edb12c6f1d31c6e0e4a84bf052eb010403010203050909090909'
... )
>>> from based58 import b58encode
>>> from solana.message import Message
>>> msg = Message.deserialize(raw_message)
>>> signatures = [b58encode(bytes([1] * SIG_LENGTH)), b58encode(bytes([2] * SIG_LENGTH))]
>>> type(Transaction.populate(msg, signatures))
<class 'solana.transaction.Transaction'>
Returns:
The populated transaction.
"""
transaction = Transaction(recent_blockhash=message.recent_blockhash)
for idx, sig in enumerate(signatures):
signature = None if sig == b58encode(Transaction.__DEFAULT_SIG) else b58decode(sig)
transaction.signatures.append(SigPubkeyPair(pubkey=message.account_keys[idx], signature=signature))
for instr in message.instructions:
account_metas: List[AccountMeta] = []
for acc_idx in instr.accounts:
pubkey = message.account_keys[acc_idx]
is_signer = any((pubkey == sigkeypair.pubkey for sigkeypair in transaction.signatures))
account_metas.append(
AccountMeta(pubkey=pubkey, is_signer=is_signer, is_writable=message.is_account_writable(acc_idx))
)
program_id = message.account_keys[instr.program_id_index]
transaction.instructions.append(
TransactionInstruction(keys=account_metas, program_id=program_id, data=b58decode(instr.data))
)
return transaction