blueshed.gust

Gust

PyPI - Version

Gust is a wrapper of tornado. It allows for a hello world such as:

from blueshed.gust import Gust, web

@web.get('/(.*)')
def get(request):
    """just a get"""
    return 'hello, world'

def main():
    """seperate construction from run"""
    Gust().run()

if __name__ == '__main__':
    main()

Similarly, you can write:

@web.ws_json_rpc('/websocket')
def add(a:float, b:float) -> float:
    """simple addition"""
    return a + b

And use a javascript websocket client to call the function:

const ws = new WebSocket("ws://localhost:8080/websocket");
ws.onopen = function () {
  ws.send(
    JSON.stringify({
      jsonrpc: "2.0",
      id: 1,
      method: "add",
      params: { a: 2.0, b: 2.0 }, // or [2.0, 2.0]
    }),
  );
};
ws.onmessage = function (evt) {
  const pre = document.createElement("pre");
  pre.textContent = evt.data;
  document.body.appendChild(pre);
};

Direct PostgreSQL Function Calling

Gust also supports calling PostgreSQL stored functions directly via WebSocket JSON-RPC:

from blueshed.gust import Gust, web, PostgresRPC
from psycopg import AsyncConnection

async def main():
    conn = await AsyncConnection.connect("postgresql://user:pass@localhost/mydb")
    pg_rpc = PostgresRPC(conn)
    web.ws_json_rpc('/api', handler=pg_rpc)
    Gust(port=8080).run()

Call any PostgreSQL function using the same JSON-RPC protocol shown above. An authenticated variant (AuthPostgresRPC) is available for automatic user injection. See samples/postgres_rpc_demo/ for complete examples.

There are simple sample apps in src/tests.

 1"""
 2.. include:: ../../../README.md
 3"""
 4
 5from .app import Gust
 6from .context import get_handler
 7from .postgres_rpc import AuthPostgresRPC, PostgresRPC
 8from .routes import Routes
 9from .static_file_handler import AuthStaticFileHandler
10from .stream import Stream
11from .utils import Redirect, stream
12from .web import web
13
14VERSION = '0.0.26'
15
16__all__ = [
17    'Gust',
18    'Routes',
19    'Redirect',
20    'stream',
21    'Stream',
22    'web',
23    'get_handler',
24    'AuthStaticFileHandler',
25    'PostgresRPC',
26    'AuthPostgresRPC',
27]
class Gust(tornado.web.Application):
 27class Gust(Application):
 28    """A minimal sub-class of tornado.web.Application"""
 29
 30    def __init__(
 31        self,
 32        routes: Optional[List] = None,
 33        port: Optional[int] = None,
 34        **kwargs,
 35    ):
 36        self.port: int = port if port else int(os.getenv('PORT', options.port))
 37        if 'debug' not in kwargs:
 38            kwargs['debug'] = options.debug
 39
 40        # WebSocket state per-instance (not shared across apps)
 41        self.ws_clients = defaultdict(list)
 42        self.ws_tasks = set()
 43
 44        super().__init__(routes, **kwargs)
 45        web.install(self)
 46
 47        # Warn about debug mode security implications
 48        if self.settings.get('debug') is True:
 49            log.warning(
 50                '⚠️  DEBUG MODE ENABLED - CORS HEADERS ARE WIDE OPEN AND '
 51                'WEBSOCKET ORIGIN CHECKING IS DISABLED. DO NOT USE IN PRODUCTION.'
 52            )
 53
 54    async def perform(self, handler, func: Callable, *args, **kwargs) -> Any:
 55        """await a function or call in a thread_pool, better yet call redis"""
 56        if inspect.iscoroutinefunction(func):
 57            log.debug('aperform: %s', func)
 58            with context.gust(handler):
 59                result = await func(*args, **kwargs)
 60        else:
 61            log.debug('perform: %s', func)
 62            partial = functools.partial(
 63                self.call_in_context, handler, func, args, kwargs
 64            )
 65            result = await asyncio.to_thread(partial)
 66        if result:
 67            if inspect.isawaitable(result):
 68                result = await result
 69            if inspect.isasyncgen(result):
 70                result = handler.create_stream(result)
 71        return result
 72
 73    def call_in_context(self, handler, func, args, kwargs):
 74        """set the context and call function"""
 75        with context.gust(handler):
 76            return func(*args, **kwargs)
 77
 78    def broadcast(self, path, message, client_ids=None):
 79        """Broadcast to WebSocket clients at a path"""
 80        Websocket.broadcast(self.ws_clients, path, message, client_ids)
 81
 82    async def _run_(self):  # pragma: no cover
 83        """listen on self.port and run io_loop"""
 84        if self.port:
 85            self.listen(self.port)
 86            log.info('listening on port: %s', self.port)
 87        else:
 88            log.warning("No 'port' in settings")
 89
 90        if self.settings.get('debug') is True:
 91            log.info('running in debug mode')
 92
 93        await asyncio.Event().wait()
 94
 95    def run(self):  # pragma: no cover
 96        try:
 97            asyncio.get_event_loop().run_until_complete(self._run_())
 98        except (KeyboardInterrupt, SystemExit):
 99            # graceful shutdown
100            pass
101
102    async def on_line(self, handler: Websocket):
103        """called when a websocket opens"""
104
105    async def off_line(self, handler: Websocket):
106        """called when a websocket closes"""
107
108    def to_json(self, value: dict) -> str:
109        """called by websocket and webhandler"""
110        return self.settings.get('json_encoder', json_utils.dumps)(value)

A minimal sub-class of tornado.web.Application

Gust(routes: Optional[List] = None, port: Optional[int] = None, **kwargs)
30    def __init__(
31        self,
32        routes: Optional[List] = None,
33        port: Optional[int] = None,
34        **kwargs,
35    ):
36        self.port: int = port if port else int(os.getenv('PORT', options.port))
37        if 'debug' not in kwargs:
38            kwargs['debug'] = options.debug
39
40        # WebSocket state per-instance (not shared across apps)
41        self.ws_clients = defaultdict(list)
42        self.ws_tasks = set()
43
44        super().__init__(routes, **kwargs)
45        web.install(self)
46
47        # Warn about debug mode security implications
48        if self.settings.get('debug') is True:
49            log.warning(
50                '⚠️  DEBUG MODE ENABLED - CORS HEADERS ARE WIDE OPEN AND '
51                'WEBSOCKET ORIGIN CHECKING IS DISABLED. DO NOT USE IN PRODUCTION.'
52            )
port: int
ws_clients
ws_tasks
async def perform(self, handler, func: Callable, *args, **kwargs) -> Any:
54    async def perform(self, handler, func: Callable, *args, **kwargs) -> Any:
55        """await a function or call in a thread_pool, better yet call redis"""
56        if inspect.iscoroutinefunction(func):
57            log.debug('aperform: %s', func)
58            with context.gust(handler):
59                result = await func(*args, **kwargs)
60        else:
61            log.debug('perform: %s', func)
62            partial = functools.partial(
63                self.call_in_context, handler, func, args, kwargs
64            )
65            result = await asyncio.to_thread(partial)
66        if result:
67            if inspect.isawaitable(result):
68                result = await result
69            if inspect.isasyncgen(result):
70                result = handler.create_stream(result)
71        return result

await a function or call in a thread_pool, better yet call redis

def call_in_context(self, handler, func, args, kwargs):
73    def call_in_context(self, handler, func, args, kwargs):
74        """set the context and call function"""
75        with context.gust(handler):
76            return func(*args, **kwargs)

set the context and call function

def broadcast(self, path, message, client_ids=None):
78    def broadcast(self, path, message, client_ids=None):
79        """Broadcast to WebSocket clients at a path"""
80        Websocket.broadcast(self.ws_clients, path, message, client_ids)

Broadcast to WebSocket clients at a path

def run(self):
 95    def run(self):  # pragma: no cover
 96        try:
 97            asyncio.get_event_loop().run_until_complete(self._run_())
 98        except (KeyboardInterrupt, SystemExit):
 99            # graceful shutdown
100            pass
async def on_line(self, handler: blueshed.gust.websocket.Websocket):
102    async def on_line(self, handler: Websocket):
103        """called when a websocket opens"""

called when a websocket opens

async def off_line(self, handler: blueshed.gust.websocket.Websocket):
105    async def off_line(self, handler: Websocket):
106        """called when a websocket closes"""

called when a websocket closes

def to_json(self, value: dict) -> str:
108    def to_json(self, value: dict) -> str:
109        """called by websocket and webhandler"""
110        return self.settings.get('json_encoder', json_utils.dumps)(value)

called by websocket and webhandler

class Routes:
 14class Routes:
 15    """to organise our decorators"""
 16
 17    def __init__(self) -> None:
 18        self.route_map = {}
 19        self.broadcaster: Websocket
 20
 21    def get(
 22        self,
 23        path,
 24        template=None,
 25        auth=False,
 26    ):
 27        """wrap a GET"""
 28        return self.default_wrap(
 29            method='get', path=path, template=template, auth=auth
 30        )
 31
 32    def post(
 33        self,
 34        path,
 35        template=None,
 36        auth=False,
 37    ):
 38        """wrap a POST"""
 39        return self.default_wrap(
 40            method='post', path=path, template=template, auth=auth
 41        )
 42
 43    def put(
 44        self,
 45        path,
 46        template=None,
 47        auth=False,
 48    ):
 49        """wrap a PUT"""
 50        return self.default_wrap(
 51            method='put', path=path, template=template, auth=auth
 52        )
 53
 54    def delete(
 55        self,
 56        path,
 57        template=None,
 58        auth=False,
 59    ):
 60        """wrap a DELETE"""
 61        return self.default_wrap(
 62            method='delete', path=path, template=template, auth=auth
 63        )
 64
 65    def head(
 66        self,
 67        path,
 68        template=None,
 69        auth=False,
 70    ):
 71        """wrap a HEAD"""
 72        return self.default_wrap(
 73            method='head', path=path, template=template, auth=auth
 74        )
 75
 76    def ws_open(
 77        self,
 78        path,
 79        auth=False,
 80    ):
 81        """wrap an on_open"""
 82        return self.default_wrap(
 83            method='ws_open', path=path, template=None, auth=auth
 84        )
 85
 86    def ws_message(
 87        self,
 88        path,
 89        auth=False,
 90    ):
 91        """wrap an on_message"""
 92        return self.default_wrap(
 93            method='ws_message', path=path, template=None, auth=auth
 94        )
 95
 96    def ws_close(
 97        self,
 98        path,
 99        auth=False,
100    ):
101        """wrap an on_close"""
102        return self.default_wrap(
103            method='ws_close', path=path, template=None, auth=auth
104        )
105
106    def ws_json_rpc(
107        self,
108        path,
109        auth=False,
110        handler=None,
111    ):
112        """
113        Wrap a json remote procedure endpoint.
114
115        Can be used in two ways:
116        1. With a function decorator:
117           @web.ws_json_rpc('/api')
118           async def my_rpc(method: str, params=None):
119               return await handler.call(method, params)
120
121        2. With a handler object:
122           web.ws_json_rpc('/api', handler=my_handler)
123           # Handler must implement: async def call(method, params)
124        """
125        return self.default_wrap(
126            method='ws_rpc',
127            path=path,
128            template=None,
129            auth=auth,
130            handler=handler,
131        )
132
133    def broadcast(self, path: str, message: str, client_ids: List[int] = None):
134        """broadcast to a path"""
135        self.broadcaster.broadcast(path, message, client_ids)
136
137    def default_wrap(
138        self,
139        method,
140        path,
141        template=None,
142        auth=False,
143        handler=None,
144    ):
145        """wrap a method"""
146
147        # If handler is provided directly (no decorator on function)
148        if handler is not None:
149            cfg = self.route_map.setdefault(path, WsConfig())
150            cfg.ws_rpc_handler = handler
151            if auth is True:
152                cfg.auth = True
153            return handler
154
155        def inner_decorator(func):
156            """we have the function"""
157            web_method = WebMethod(func=func, template=template, auth=auth)
158            if method.startswith('ws_'):
159                cfg = self.route_map.setdefault(path, WsConfig())
160                if method.startswith('ws_rpc'):
161                    cfg.ws_rpc[func.__name__] = web_method
162                else:
163                    setattr(cfg, method, web_method)
164                if auth is True:
165                    cfg.auth = True
166            else:
167                cfg = self.route_map.setdefault(path, WebConfig())
168                setattr(cfg, method, web_method)
169            return func
170
171        return inner_decorator
172
173    def install(self, app):
174        routes = []
175        for path, cfg in self.route_map.items():
176            if isinstance(cfg, WebConfig):
177                handler = WebHandler
178                init_dict = {'method_settings': cfg}
179            else:
180                handler = Websocket
181                init_dict = {
182                    'method_settings': cfg,
183                    'ws_clients': app.ws_clients,
184                    'ws_tasks': app.ws_tasks,
185                }
186            routes.append((rf'{path}', handler, init_dict))
187        routes.sort(reverse=True)
188        self.broadcaster = app
189        app.add_handlers('.*', routes)
190        return routes

to organise our decorators

route_map
broadcaster: blueshed.gust.websocket.Websocket
def get(self, path, template=None, auth=False):
21    def get(
22        self,
23        path,
24        template=None,
25        auth=False,
26    ):
27        """wrap a GET"""
28        return self.default_wrap(
29            method='get', path=path, template=template, auth=auth
30        )

wrap a GET

def post(self, path, template=None, auth=False):
32    def post(
33        self,
34        path,
35        template=None,
36        auth=False,
37    ):
38        """wrap a POST"""
39        return self.default_wrap(
40            method='post', path=path, template=template, auth=auth
41        )

wrap a POST

def put(self, path, template=None, auth=False):
43    def put(
44        self,
45        path,
46        template=None,
47        auth=False,
48    ):
49        """wrap a PUT"""
50        return self.default_wrap(
51            method='put', path=path, template=template, auth=auth
52        )

wrap a PUT

def delete(self, path, template=None, auth=False):
54    def delete(
55        self,
56        path,
57        template=None,
58        auth=False,
59    ):
60        """wrap a DELETE"""
61        return self.default_wrap(
62            method='delete', path=path, template=template, auth=auth
63        )

wrap a DELETE

def head(self, path, template=None, auth=False):
65    def head(
66        self,
67        path,
68        template=None,
69        auth=False,
70    ):
71        """wrap a HEAD"""
72        return self.default_wrap(
73            method='head', path=path, template=template, auth=auth
74        )

wrap a HEAD

def ws_open(self, path, auth=False):
76    def ws_open(
77        self,
78        path,
79        auth=False,
80    ):
81        """wrap an on_open"""
82        return self.default_wrap(
83            method='ws_open', path=path, template=None, auth=auth
84        )

wrap an on_open

def ws_message(self, path, auth=False):
86    def ws_message(
87        self,
88        path,
89        auth=False,
90    ):
91        """wrap an on_message"""
92        return self.default_wrap(
93            method='ws_message', path=path, template=None, auth=auth
94        )

wrap an on_message

def ws_close(self, path, auth=False):
 96    def ws_close(
 97        self,
 98        path,
 99        auth=False,
100    ):
101        """wrap an on_close"""
102        return self.default_wrap(
103            method='ws_close', path=path, template=None, auth=auth
104        )

wrap an on_close

def ws_json_rpc(self, path, auth=False, handler=None):
106    def ws_json_rpc(
107        self,
108        path,
109        auth=False,
110        handler=None,
111    ):
112        """
113        Wrap a json remote procedure endpoint.
114
115        Can be used in two ways:
116        1. With a function decorator:
117           @web.ws_json_rpc('/api')
118           async def my_rpc(method: str, params=None):
119               return await handler.call(method, params)
120
121        2. With a handler object:
122           web.ws_json_rpc('/api', handler=my_handler)
123           # Handler must implement: async def call(method, params)
124        """
125        return self.default_wrap(
126            method='ws_rpc',
127            path=path,
128            template=None,
129            auth=auth,
130            handler=handler,
131        )

Wrap a json remote procedure endpoint.

Can be used in two ways:

  1. With a function decorator: @web.ws_json_rpc('/api') async def my_rpc(method: str, params=None): return await handler.call(method, params)
  1. With a handler object: web.ws_json_rpc('/api', handler=my_handler) # Handler must implement: async def call(method, params)
def broadcast(self, path: str, message: str, client_ids: List[int] = None):
133    def broadcast(self, path: str, message: str, client_ids: List[int] = None):
134        """broadcast to a path"""
135        self.broadcaster.broadcast(path, message, client_ids)

broadcast to a path

def default_wrap(self, method, path, template=None, auth=False, handler=None):
137    def default_wrap(
138        self,
139        method,
140        path,
141        template=None,
142        auth=False,
143        handler=None,
144    ):
145        """wrap a method"""
146
147        # If handler is provided directly (no decorator on function)
148        if handler is not None:
149            cfg = self.route_map.setdefault(path, WsConfig())
150            cfg.ws_rpc_handler = handler
151            if auth is True:
152                cfg.auth = True
153            return handler
154
155        def inner_decorator(func):
156            """we have the function"""
157            web_method = WebMethod(func=func, template=template, auth=auth)
158            if method.startswith('ws_'):
159                cfg = self.route_map.setdefault(path, WsConfig())
160                if method.startswith('ws_rpc'):
161                    cfg.ws_rpc[func.__name__] = web_method
162                else:
163                    setattr(cfg, method, web_method)
164                if auth is True:
165                    cfg.auth = True
166            else:
167                cfg = self.route_map.setdefault(path, WebConfig())
168                setattr(cfg, method, web_method)
169            return func
170
171        return inner_decorator

wrap a method

def install(self, app):
173    def install(self, app):
174        routes = []
175        for path, cfg in self.route_map.items():
176            if isinstance(cfg, WebConfig):
177                handler = WebHandler
178                init_dict = {'method_settings': cfg}
179            else:
180                handler = Websocket
181                init_dict = {
182                    'method_settings': cfg,
183                    'ws_clients': app.ws_clients,
184                    'ws_tasks': app.ws_tasks,
185                }
186            routes.append((rf'{path}', handler, init_dict))
187        routes.sort(reverse=True)
188        self.broadcaster = app
189        app.add_handlers('.*', routes)
190        return routes
class Redirect(builtins.Exception):
18class Redirect(Exception):
19    """redirect to url and if cookie set it"""
20
21    def __init__(self, url='/'):
22        self.url = url
23        super().__init__()

redirect to url and if cookie set it

Redirect(url='/')
21    def __init__(self, url='/'):
22        self.url = url
23        super().__init__()
url
def stream(func):
112def stream(func):
113    """Decorator to stream results"""
114
115    @wraps(func)
116    async def wrapper(*args, **kwargs):
117        """Wrapper"""
118        if kwargs.get('stream_id', False):
119            stream_id = kwargs.pop('stream_id')
120        else:
121            stream_id = args[0]
122            args = args[1:]
123        log.debug('stream: %s %s %s - %s', func, args, kwargs, stream_id)
124        yield stream_id
125        async for item in func(*args, **kwargs):
126            yield item
127
128    return wrapper

Decorator to stream results

class Stream:
 6class Stream:
 7    """A special instance of a result after a result"""
 8
 9    def __init__(self, gen: AsyncGenerator) -> None:
10        self.id = str(uuid.uuid4())
11        self.gen = gen
12
13    def to_json(self) -> dict:
14        """We're just a response"""
15        return {'stream_id': self.id}

A special instance of a result after a result

Stream(gen: AsyncGenerator)
 9    def __init__(self, gen: AsyncGenerator) -> None:
10        self.id = str(uuid.uuid4())
11        self.gen = gen
id
gen
def to_json(self) -> dict:
13    def to_json(self) -> dict:
14        """We're just a response"""
15        return {'stream_id': self.id}

We're just a response

web = <Routes object>
def get_handler():
20def get_handler():
21    """get the current handler"""
22    return HANDLER.get(None)

get the current handler

class AuthStaticFileHandler(blueshed.gust.utils.UserMixin, tornado.web.StaticFileHandler):
11class AuthStaticFileHandler(UserMixin, StaticFileHandler):
12    """
13    This provide integration between tornado.web.authenticated
14    and tornado.web.StaticFileHandler.
15
16    It assumes you have set up the cookie name in the application
17    settings and that the request already has the cookie set. In
18    other words the user has already authenticated.
19    """
20
21    def initialize(self, allow: list = None, **kwargs):
22        """allow some paths through"""
23        super().initialize(**kwargs)
24        self.allow = allow if allow else []
25
26    def get(self, path, include_body=True):
27        """safe to return what you need"""
28        if self.current_user is None and path not in self.allow:
29            self.not_authenticated()
30        return StaticFileHandler.get(self, path, include_body)

This provide integration between tornado.web.authenticated and tornado.web.StaticFileHandler.

It assumes you have set up the cookie name in the application settings and that the request already has the cookie set. In other words the user has already authenticated.

def initialize(self, allow: list = None, **kwargs):
21    def initialize(self, allow: list = None, **kwargs):
22        """allow some paths through"""
23        super().initialize(**kwargs)
24        self.allow = allow if allow else []

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a URLSpec will be supplied as keyword arguments to initialize().

Example::

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
def get(self, path, include_body=True):
26    def get(self, path, include_body=True):
27        """safe to return what you need"""
28        if self.current_user is None and path not in self.allow:
29            self.not_authenticated()
30        return StaticFileHandler.get(self, path, include_body)

safe to return what you need

class PostgresRPC:
 53class PostgresRPC:
 54    """
 55    Call PostgreSQL stored functions via JSON-RPC with automatic
 56    parameter marshalling from JSON-RPC format to PostgreSQL format.
 57
 58    Supports both positional (array) and named (object) parameters.
 59    Caches function signatures to avoid repeated schema queries.
 60    """
 61
 62    def __init__(
 63        self,
 64        connection,
 65        schema: str = 'public',
 66    ):
 67        """
 68        Initialize PostgreSQL RPC handler.
 69
 70        Args:
 71            connection: psycopg AsyncConnection or AsyncConnectionPool.
 72                Must support async context managers and cursor operations.
 73            schema: PostgreSQL schema to search for functions (default: 'public')
 74        """
 75        self.connection = connection
 76        self.schema = schema
 77
 78    async def call(
 79        self,
 80        method: str,
 81        params: Union[List, Dict, None] = None,
 82    ) -> Any:
 83        """
 84        Call a PostgreSQL stored function.
 85
 86        When params is a dict (named parameters), the function signature is
 87        queried to determine the correct parameter order, then params are
 88        marshalled to positional arguments.
 89
 90        When params is a list (positional parameters), they are used as-is.
 91
 92        Args:
 93            method: PostgreSQL function name (must not start with '_')
 94            params: JSON-RPC params as list (positional) or dict (named),
 95                    or None for no parameters
 96
 97        Returns:
 98            The first column of the first row returned by the function
 99
100        Raises:
101            ValueError: If function name starts with '_' (private convention)
102            ValueError: If function not found or parameters are invalid
103            Exception: If PostgreSQL execution fails
104        """
105        # Security: reject private functions (starting with underscore)
106        if method.startswith('_'):
107            raise ValueError(f'Cannot call private function: {method}')
108
109        # Normalize and validate params
110        if params is None:
111            params = []
112        elif isinstance(params, dict):
113            # Named parameters: need to look up function signature
114            signature = await self._get_function_signature(method)
115            if not signature:
116                raise ValueError(f'Function not found: {method}')
117
118            # Reorder params according to function signature
119            positional_params = []
120            for param_name in signature:
121                if param_name not in params:
122                    raise ValueError(
123                        f'Missing required parameter: {param_name} '
124                        f'for function {method}'
125                    )
126                positional_params.append(params[param_name])
127            params = positional_params
128        elif not isinstance(params, list):
129            raise ValueError(
130                'Params must be list (positional) or dict (named)'
131            )
132
133        # Build SQL: SELECT method(%s, %s, ...)
134        # Note: psycopg3 uses %s placeholders, not $1, $2
135        param_placeholders = ', '.join(['%s'] * len(params))
136        sql = f'SELECT {self.schema}.{method}({param_placeholders})'
137
138        log.debug('PostgreSQL RPC: %s with params %r', sql, params)
139
140        # Execute function with error handling
141        try:
142            async with self.connection.cursor() as cur:
143                await cur.execute(sql, params)
144                result = await cur.fetchone()
145            # Return first column of first row, or None
146            return result[0] if result else None
147        except Exception as e:
148            # Rollback failed transaction to reset connection state
149            try:
150                await self.connection.rollback()
151                log.info('Transaction rolled back after error')
152            except Exception as rollback_err:
153                log.error('Failed to rollback transaction: %s', rollback_err)
154            # Re-raise the original error with details
155            raise
156
157    async def _get_function_signature(
158        self, function_name: str
159    ) -> Optional[List[str]]:
160        """
161        Get function input parameter names in order.
162
163        Queries information_schema and caches the result to avoid
164        repeated schema lookups.
165
166        Args:
167            function_name: Name of the PostgreSQL function
168
169        Returns:
170            List of parameter names in order, or empty list if function
171            has no input parameters, or None if function not found
172        """
173        cache_key = f'{self.schema}.{function_name}'
174
175        if cache_key in _FUNCTION_SIGNATURE_CACHE:
176            return _FUNCTION_SIGNATURE_CACHE[cache_key]
177
178        # Query information_schema for function signature
179        async with self.connection.cursor() as cur:
180            await cur.execute(
181                """
182                SELECT array_agg(p.parameter_name ORDER BY p.ordinal_position)::text[]
183                FROM information_schema.parameters p
184                JOIN information_schema.routines r
185                  ON r.specific_schema = p.specific_schema
186                  AND r.specific_name = p.specific_name
187                WHERE r.routine_schema = %s
188                  AND r.routine_name = %s
189                  AND p.parameter_mode IN ('IN', 'INOUT')
190                """,
191                [self.schema, function_name],
192            )
193
194            result = await cur.fetchone()
195
196        if not result or not result[0]:
197            # Function not found or has no input parameters
198            signature = []
199        else:
200            signature = list(result[0])
201
202        # Cache result
203        _FUNCTION_SIGNATURE_CACHE[cache_key] = signature
204
205        log.debug(
206            'Cached function signature: %s.%s -> %r',
207            self.schema,
208            function_name,
209            signature,
210        )
211
212        return signature
213
214    @classmethod
215    def clear_cache(cls):
216        """Clear the global function signature cache."""
217        _FUNCTION_SIGNATURE_CACHE.clear()
218        log.debug('Cleared PostgreSQL function signature cache')

Call PostgreSQL stored functions via JSON-RPC with automatic parameter marshalling from JSON-RPC format to PostgreSQL format.

Supports both positional (array) and named (object) parameters. Caches function signatures to avoid repeated schema queries.

PostgresRPC(connection, schema: str = 'public')
62    def __init__(
63        self,
64        connection,
65        schema: str = 'public',
66    ):
67        """
68        Initialize PostgreSQL RPC handler.
69
70        Args:
71            connection: psycopg AsyncConnection or AsyncConnectionPool.
72                Must support async context managers and cursor operations.
73            schema: PostgreSQL schema to search for functions (default: 'public')
74        """
75        self.connection = connection
76        self.schema = schema

Initialize PostgreSQL RPC handler.

Args: connection: psycopg AsyncConnection or AsyncConnectionPool. Must support async context managers and cursor operations. schema: PostgreSQL schema to search for functions (default: 'public')

connection
schema
async def call(self, method: str, params: Union[List, Dict, NoneType] = None) -> Any:
 78    async def call(
 79        self,
 80        method: str,
 81        params: Union[List, Dict, None] = None,
 82    ) -> Any:
 83        """
 84        Call a PostgreSQL stored function.
 85
 86        When params is a dict (named parameters), the function signature is
 87        queried to determine the correct parameter order, then params are
 88        marshalled to positional arguments.
 89
 90        When params is a list (positional parameters), they are used as-is.
 91
 92        Args:
 93            method: PostgreSQL function name (must not start with '_')
 94            params: JSON-RPC params as list (positional) or dict (named),
 95                    or None for no parameters
 96
 97        Returns:
 98            The first column of the first row returned by the function
 99
100        Raises:
101            ValueError: If function name starts with '_' (private convention)
102            ValueError: If function not found or parameters are invalid
103            Exception: If PostgreSQL execution fails
104        """
105        # Security: reject private functions (starting with underscore)
106        if method.startswith('_'):
107            raise ValueError(f'Cannot call private function: {method}')
108
109        # Normalize and validate params
110        if params is None:
111            params = []
112        elif isinstance(params, dict):
113            # Named parameters: need to look up function signature
114            signature = await self._get_function_signature(method)
115            if not signature:
116                raise ValueError(f'Function not found: {method}')
117
118            # Reorder params according to function signature
119            positional_params = []
120            for param_name in signature:
121                if param_name not in params:
122                    raise ValueError(
123                        f'Missing required parameter: {param_name} '
124                        f'for function {method}'
125                    )
126                positional_params.append(params[param_name])
127            params = positional_params
128        elif not isinstance(params, list):
129            raise ValueError(
130                'Params must be list (positional) or dict (named)'
131            )
132
133        # Build SQL: SELECT method(%s, %s, ...)
134        # Note: psycopg3 uses %s placeholders, not $1, $2
135        param_placeholders = ', '.join(['%s'] * len(params))
136        sql = f'SELECT {self.schema}.{method}({param_placeholders})'
137
138        log.debug('PostgreSQL RPC: %s with params %r', sql, params)
139
140        # Execute function with error handling
141        try:
142            async with self.connection.cursor() as cur:
143                await cur.execute(sql, params)
144                result = await cur.fetchone()
145            # Return first column of first row, or None
146            return result[0] if result else None
147        except Exception as e:
148            # Rollback failed transaction to reset connection state
149            try:
150                await self.connection.rollback()
151                log.info('Transaction rolled back after error')
152            except Exception as rollback_err:
153                log.error('Failed to rollback transaction: %s', rollback_err)
154            # Re-raise the original error with details
155            raise

Call a PostgreSQL stored function.

When params is a dict (named parameters), the function signature is queried to determine the correct parameter order, then params are marshalled to positional arguments.

When params is a list (positional parameters), they are used as-is.

Args: method: PostgreSQL function name (must not start with '_') params: JSON-RPC params as list (positional) or dict (named), or None for no parameters

Returns: The first column of the first row returned by the function

Raises: ValueError: If function name starts with '_' (private convention) ValueError: If function not found or parameters are invalid Exception: If PostgreSQL execution fails

@classmethod
def clear_cache(cls):
214    @classmethod
215    def clear_cache(cls):
216        """Clear the global function signature cache."""
217        _FUNCTION_SIGNATURE_CACHE.clear()
218        log.debug('Cleared PostgreSQL function signature cache')

Clear the global function signature cache.

class AuthPostgresRPC(blueshed.gust.PostgresRPC):
221class AuthPostgresRPC(PostgresRPC):
222    """
223    Authenticated PostgreSQL RPC handler that automatically injects
224    the current user as the first parameter to all function calls.
225
226    This is useful for:
227    - Row-level security (functions filter by user)
228    - Audit trails (functions can log who made the change)
229    - Multi-tenant systems (functions can enforce tenant isolation)
230
231    The current user is obtained from the request context and must be
232    available via context.get_current_user().
233
234    Example PostgreSQL function signature:
235        CREATE FUNCTION get_user_orders(current_user_id INT, ...)
236            RETURNS TABLE(...)
237    """
238
239    async def call(
240        self,
241        method: str,
242        params: Union[List, Dict, None] = None,
243        require_auth: bool = True,
244    ) -> Any:
245        """
246        Call a PostgreSQL stored function with automatic user injection.
247
248        The current user from the request context is automatically prepended
249        as the first parameter.
250
251        Args:
252            method: PostgreSQL function name (must not start with '_')
253            params: JSON-RPC params as list (positional) or dict (named),
254                    or None for no parameters beyond the user
255            require_auth: If True, raise ValueError if no current user.
256                         If False, pass None as user if not authenticated.
257
258        Returns:
259            The first column of the first row returned by the function
260
261        Raises:
262            ValueError: If require_auth is True and no current user exists
263            ValueError: If function name starts with '_' (private convention)
264            Exception: If PostgreSQL execution fails
265        """
266        # Get current user from context
267        current_user = context.get_current_user()
268
269        if current_user is None and require_auth:
270            raise ValueError('Authentication required: no current user')
271
272        # Prepare params with user prepended
273        if params is None:
274            params_with_user = [current_user]
275        elif isinstance(params, list):
276            # Positional params: prepend user to the list
277            params_with_user = [current_user] + params
278        elif isinstance(params, dict):
279            # Named params: prepend user and let parent handle reordering
280            # The user will be first in the function signature
281            params_with_user = {**params, '_user': current_user}
282        else:
283            raise ValueError(
284                'Params must be list (positional) or dict (named)'
285            )
286
287        log.debug(
288            'AuthPostgresRPC: calling %s with user=%s, params=%r',
289            method,
290            current_user,
291            params,
292        )
293
294        # Call parent with user-injected params
295        return await super().call(method, params_with_user)

Authenticated PostgreSQL RPC handler that automatically injects the current user as the first parameter to all function calls.

This is useful for:

  • Row-level security (functions filter by user)
  • Audit trails (functions can log who made the change)
  • Multi-tenant systems (functions can enforce tenant isolation)

The current user is obtained from the request context and must be available via context.get_current_user().

Example PostgreSQL function signature: CREATE FUNCTION get_user_orders(current_user_id INT, ...) RETURNS TABLE(...)

async def call( self, method: str, params: Union[List, Dict, NoneType] = None, require_auth: bool = True) -> Any:
239    async def call(
240        self,
241        method: str,
242        params: Union[List, Dict, None] = None,
243        require_auth: bool = True,
244    ) -> Any:
245        """
246        Call a PostgreSQL stored function with automatic user injection.
247
248        The current user from the request context is automatically prepended
249        as the first parameter.
250
251        Args:
252            method: PostgreSQL function name (must not start with '_')
253            params: JSON-RPC params as list (positional) or dict (named),
254                    or None for no parameters beyond the user
255            require_auth: If True, raise ValueError if no current user.
256                         If False, pass None as user if not authenticated.
257
258        Returns:
259            The first column of the first row returned by the function
260
261        Raises:
262            ValueError: If require_auth is True and no current user exists
263            ValueError: If function name starts with '_' (private convention)
264            Exception: If PostgreSQL execution fails
265        """
266        # Get current user from context
267        current_user = context.get_current_user()
268
269        if current_user is None and require_auth:
270            raise ValueError('Authentication required: no current user')
271
272        # Prepare params with user prepended
273        if params is None:
274            params_with_user = [current_user]
275        elif isinstance(params, list):
276            # Positional params: prepend user to the list
277            params_with_user = [current_user] + params
278        elif isinstance(params, dict):
279            # Named params: prepend user and let parent handle reordering
280            # The user will be first in the function signature
281            params_with_user = {**params, '_user': current_user}
282        else:
283            raise ValueError(
284                'Params must be list (positional) or dict (named)'
285            )
286
287        log.debug(
288            'AuthPostgresRPC: calling %s with user=%s, params=%r',
289            method,
290            current_user,
291            params,
292        )
293
294        # Call parent with user-injected params
295        return await super().call(method, params_with_user)

Call a PostgreSQL stored function with automatic user injection.

The current user from the request context is automatically prepended as the first parameter.

Args: method: PostgreSQL function name (must not start with '_') params: JSON-RPC params as list (positional) or dict (named), or None for no parameters beyond the user require_auth: If True, raise ValueError if no current user. If False, pass None as user if not authenticated.

Returns: The first column of the first row returned by the function

Raises: ValueError: If require_auth is True and no current user exists ValueError: If function name starts with '_' (private convention) Exception: If PostgreSQL execution fails