67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
import json
|
|
from functools import partial
|
|
from typing import Any, Dict, Iterator, Tuple, Union
|
|
|
|
from . import id_generators
|
|
from .sentinels import NOID
|
|
from .utils import compose
|
|
|
|
|
|
def notification_pure(
|
|
method: str, params: Union[Dict[str, Any], Tuple[Any, ...]]
|
|
) -> Dict[str, Any]:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"method": method,
|
|
**({"params": params} if params else {}),
|
|
}
|
|
|
|
|
|
def notification(
|
|
method: str, params: Union[Dict[str, Any], Tuple[Any, ...], None] = None
|
|
) -> Dict[str, Any]:
|
|
return notification_pure(method, params if params else ())
|
|
|
|
|
|
notification_json = compose(json.dumps, notification)
|
|
|
|
|
|
def request_pure(
|
|
id_generator: Iterator[Any],
|
|
method: str,
|
|
params: Union[Dict[str, Any], Tuple[Any, ...]],
|
|
id: Any,
|
|
) -> Dict[str, Any]:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"method": method,
|
|
**(
|
|
{"params": list(params) if isinstance(params, tuple) else params}
|
|
if params
|
|
else {}
|
|
),
|
|
"id": id if id is not NOID else next(id_generator),
|
|
}
|
|
|
|
|
|
def request_impure(
|
|
id_generator: Iterator[Any],
|
|
method: str,
|
|
params: Union[Dict[str, Any], Tuple[Any, ...], None] = None,
|
|
id: Any = NOID,
|
|
) -> Dict[str, Any]:
|
|
return request_pure(
|
|
id_generator or id_generators.decimal(), method, params or (), id
|
|
)
|
|
|
|
|
|
request_natural = partial(request_impure, id_generators.decimal())
|
|
request_hex = partial(request_impure, id_generators.hexadecimal())
|
|
request_random = partial(request_impure, id_generators.random())
|
|
request_uuid = partial(request_impure, id_generators.uuid())
|
|
request = request_natural
|
|
|
|
request_json = compose(json.dumps, request_natural)
|
|
request_json_hex = compose(json.dumps, request_hex)
|
|
request_json_random = compose(json.dumps, request_random)
|
|
request_json_uuid = compose(json.dumps, request_uuid)
|