97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
""" The Observable Monad
|
|
|
|
* https://www.youtube.com/watch?v=looJcaeboBY
|
|
* https://wiki.haskell.org/MonadCont_under_the_hood
|
|
* http://blog.sigfpe.com/2008/12/mother-of-all-monads.html
|
|
* http://www.haskellforall.com/2012/12/the-continuation-monad.html
|
|
|
|
"""
|
|
|
|
from typing import Any, Callable, TypeVar, Generic
|
|
|
|
from .util import identity, compose
|
|
from .typing import Monad, Functor
|
|
|
|
TSource = TypeVar("TSource")
|
|
TResult = TypeVar("TResult")
|
|
|
|
|
|
class Observable(Generic[TSource]):
|
|
|
|
"""The Rx Observable Monad.
|
|
|
|
The Rx Observable monad is based on the Continuation monad
|
|
representing suspended computations in continuation-passing style
|
|
(CPS).
|
|
"""
|
|
|
|
def __init__(self, subscribe: Callable[[Callable], Any]) -> None:
|
|
"""Observable constructor.
|
|
|
|
Keyword arguments:
|
|
subscribe -- A callable that takes a callable (on_next)
|
|
"""
|
|
self._get_value = lambda: subscribe
|
|
|
|
@classmethod
|
|
def unit(cls, x: TSource) -> 'Observable[TSource]':
|
|
"""x -> Observable x"""
|
|
return cls(lambda on_next: on_next(x))
|
|
just = unit
|
|
|
|
def map(self, mapper: Callable[[TSource], TResult]) -> 'Observable[TResult]':
|
|
r"""Map a function over an observable.
|
|
|
|
Haskell: fmap f m = Cont $ \c -> runCont m (c . f)
|
|
"""
|
|
source = self
|
|
return Observable(lambda on_next: source.subscribe(compose(on_next, mapper)))
|
|
|
|
def bind(self, fn: Callable[[TSource], 'Observable[TResult]']) -> 'Observable[TResult]':
|
|
r"""Chain continuation passing functions.
|
|
|
|
Haskell: m >>= k = Cont $ \c -> runCont m $ \a -> runCont (k a) c
|
|
"""
|
|
source = self
|
|
return Observable(lambda on_next: source.subscribe(lambda a: fn(a).subscribe(on_next)))
|
|
flat_map = bind
|
|
|
|
def filter(self, predicate: Callable[[TSource], bool]) -> 'Observable[TSource]':
|
|
"""Filter the on_next continuation functions"""
|
|
source = self
|
|
|
|
def subscribe(on_next):
|
|
def _next(x):
|
|
if predicate(x):
|
|
on_next(x)
|
|
|
|
return source.subscribe(_next)
|
|
return Observable(subscribe)
|
|
|
|
@staticmethod
|
|
def call_cc(fn: Callable) -> 'Observable':
|
|
r"""call-with-current-continuation.
|
|
|
|
Haskell: callCC f = Cont $ \c -> runCont (f (\a -> Cont $ \_ -> c a )) c
|
|
"""
|
|
def subscribe(on_next):
|
|
return fn(lambda a: Observable(lambda _: on_next(a))).subscribe(on_next)
|
|
|
|
return Observable(subscribe)
|
|
|
|
def subscribe(self, on_next: Callable[[TSource], None]) -> Any:
|
|
return self._get_value()(on_next)
|
|
|
|
def __or__(self, func):
|
|
"""Use | as operator for bind.
|
|
|
|
Provide the | operator instead of the Haskell >>= operator
|
|
"""
|
|
return self.bind(func)
|
|
|
|
def __eq__(self, other) -> bool:
|
|
return self.subscribe(identity) == other.subscribe(identity)
|
|
|
|
|
|
assert(isinstance(Observable, Functor))
|
|
assert(isinstance(Observable, Monad))
|