tokencrawler/.venv/lib/python3.9/site-packages/solana/rpc/providers/http.py
2022-03-17 22:16:30 +01:00

35 lines
1.3 KiB
Python

"""HTTP RPC Provider."""
from typing import Any
import requests
from ..types import RPCMethod, RPCResponse
from .base import BaseProvider
from .core import _HTTPProviderCore
from ...exceptions import handle_exceptions, SolanaRpcException
class HTTPProvider(BaseProvider, _HTTPProviderCore):
"""HTTP provider to interact with the http rpc endpoint."""
def __str__(self) -> str:
"""String definition for HTTPProvider."""
return f"HTTP RPC connection {self.endpoint_uri}"
@handle_exceptions(SolanaRpcException, requests.exceptions.RequestException)
def make_request(self, method: RPCMethod, *params: Any) -> RPCResponse:
"""Make an HTTP request to an http rpc endpoint."""
request_kwargs = self._before_request(method=method, params=params, is_async=False)
raw_response = requests.post(**request_kwargs, timeout=self.timeout)
return self._after_request(raw_response=raw_response, method=method)
def is_connected(self) -> bool:
"""Health check."""
try:
response = requests.get(self.health_uri)
response.raise_for_status()
except (IOError, requests.HTTPError) as err:
self.logger.error("Health check failed with error: %s", str(err))
return False
return response.ok