Server - Introduction
RSocketServer
is the high level abstraction leveraged to create a server running the RSocket protocol.
It is a subclass of RSocket
.
An RSocketServer
server can be used to communicate with any RSocket Client implemented against the same protocol version as the server,
and which implements the same transport as the server.
To get started creating an RSocket server, you will need to install the rsocket package,
and at least one transport protocol implementation (TCP available by default).
See the server portion of Client Server Example for an example of an implemented getRequestHandler
.
Transports
Transport is the abstraction which handles the underlying network communication portion of the RSocket applicaiton protocol.
Available network transports for rsocket-py
server include:
- TCP - available by default
- Websocket (aiohttp, Quart)
RequestHandler
When creating a RSocketServer
instance, the constructor require a factory (method or class) be provided that can
return an object matching the RequestHandler
abstract class.
This object is responsible for mapping callback/handler functions to the various RSocket message types,
and returning an appropriate Publisher/Future that will produce data for the request.
import asyncio
from typing import Tuple, Optional
from datetime import timedelta
from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from rsocket.payload import Payload
from rsocket.request_handler import RequestHandler
from rsocket.error_codes import ErrorCode
class CustomRequestHandler(RequestHandler):
async def on_setup(self,
data_encoding: bytes,
metadata_encoding: bytes,
payload: Payload):
...
async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
...
async def request_fire_and_forget(self, payload: Payload):
...
async def on_metadata_push(self, payload: Payload):
...
async def request_response(self, payload: Payload) -> asyncio.Future:
...
async def request_stream(self, payload: Payload) -> Publisher:
...
async def on_error(self, error_code: ErrorCode, payload: Payload):
...
async def on_keepalive_timeout(self,
time_since_last_keepalive: timedelta,
rsocket):
...
async def on_connection_lost(self, rsocket, exception):
...
Client Cancellation
An important characteristic of RSocket's protocol is the concept of cancellation.
In the context of an RSocket server, once a client connection/request has begun, it is possible that the client which initiated the request may decide it no longer wishes to continue and signal to the server that it wishes to cancel.
In the event that a client signals to an RSocket server that it wishes to cancel a request, the server should avoid
calling the onComplete
or onNext
callbacks for the requests resulting Single
or Flowable
instances.
Cancellation Request-Response Example
import asyncio
import logging
from asyncio import Future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
async def calculate():
try:
await asyncio.sleep(4)
return 'Response'
except asyncio.CancelledError:
logging.info('Canceled by client')
raise
class CustomRequestHandler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Future:
return asyncio.ensure_future(calculate())
Cancellation Request-Stream Example
from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import Subscription
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
class CustomRequestHandler(BaseRequestHandler):
class ResponseStream(Publisher, Subscription):
def subscribe(self, subscriber: Subscriber):
subscriber.on_subscribe(self)
self.subscriber = subscriber
async def request(self, n: int):
...
def cancel(self):
...
async def request_stream(self, payload: Payload) -> Publisher:
return self.ResponseStream()