blueshed.gust
Gust
Gust is a wrapper of tornado. It allows for a hello world such as:
from blueshed.gust import Gust, web
@web.get('/(.*)')
def get(request):
"""just a get"""
return 'hello, world'
def main():
"""seperate construction from run"""
Gust().run()
if __name__ == '__main__':
main()
Similarly, you can write:
@web.ws_json_rpc('/websocket')
def add(a:float, b:float) -> float:
"""simple addition"""
return a + b
And use a javascript websocket client to call the function:
const ws = new WebSocket("ws://localhost:8080/websocket");
ws.onopen = function () {
ws.send(
JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "add",
params: { a: 2.0, b: 2.0 }, // or [2.0, 2.0]
}),
);
};
ws.onmessage = function (evt) {
const pre = document.createElement("pre");
pre.textContent = evt.data;
document.body.appendChild(pre);
};
Direct PostgreSQL Function Calling
Gust also supports calling PostgreSQL stored functions directly via WebSocket JSON-RPC:
from blueshed.gust import Gust, web, PostgresRPC
from psycopg import AsyncConnection
async def main():
conn = await AsyncConnection.connect("postgresql://user:pass@localhost/mydb")
pg_rpc = PostgresRPC(conn)
web.ws_json_rpc('/api', handler=pg_rpc)
Gust(port=8080).run()
Call any PostgreSQL function using the same JSON-RPC protocol shown above. An authenticated variant (AuthPostgresRPC) is available for automatic user injection. See samples/postgres_rpc_demo/ for complete examples.
There are simple sample apps in src/tests.
1""" 2.. include:: ../../../README.md 3""" 4 5from .app import Gust 6from .context import get_handler 7from .postgres_rpc import AuthPostgresRPC, PostgresRPC 8from .routes import Routes 9from .static_file_handler import AuthStaticFileHandler 10from .stream import Stream 11from .utils import Redirect, stream 12from .web import web 13 14VERSION = '0.0.26' 15 16__all__ = [ 17 'Gust', 18 'Routes', 19 'Redirect', 20 'stream', 21 'Stream', 22 'web', 23 'get_handler', 24 'AuthStaticFileHandler', 25 'PostgresRPC', 26 'AuthPostgresRPC', 27]
27class Gust(Application): 28 """A minimal sub-class of tornado.web.Application""" 29 30 def __init__( 31 self, 32 routes: Optional[List] = None, 33 port: Optional[int] = None, 34 **kwargs, 35 ): 36 self.port: int = port if port else int(os.getenv('PORT', options.port)) 37 if 'debug' not in kwargs: 38 kwargs['debug'] = options.debug 39 40 # WebSocket state per-instance (not shared across apps) 41 self.ws_clients = defaultdict(list) 42 self.ws_tasks = set() 43 44 super().__init__(routes, **kwargs) 45 web.install(self) 46 47 # Warn about debug mode security implications 48 if self.settings.get('debug') is True: 49 log.warning( 50 '⚠️ DEBUG MODE ENABLED - CORS HEADERS ARE WIDE OPEN AND ' 51 'WEBSOCKET ORIGIN CHECKING IS DISABLED. DO NOT USE IN PRODUCTION.' 52 ) 53 54 async def perform(self, handler, func: Callable, *args, **kwargs) -> Any: 55 """await a function or call in a thread_pool, better yet call redis""" 56 if inspect.iscoroutinefunction(func): 57 log.debug('aperform: %s', func) 58 with context.gust(handler): 59 result = await func(*args, **kwargs) 60 else: 61 log.debug('perform: %s', func) 62 partial = functools.partial( 63 self.call_in_context, handler, func, args, kwargs 64 ) 65 result = await asyncio.to_thread(partial) 66 if result: 67 if inspect.isawaitable(result): 68 result = await result 69 if inspect.isasyncgen(result): 70 result = handler.create_stream(result) 71 return result 72 73 def call_in_context(self, handler, func, args, kwargs): 74 """set the context and call function""" 75 with context.gust(handler): 76 return func(*args, **kwargs) 77 78 def broadcast(self, path, message, client_ids=None): 79 """Broadcast to WebSocket clients at a path""" 80 Websocket.broadcast(self.ws_clients, path, message, client_ids) 81 82 async def _run_(self): # pragma: no cover 83 """listen on self.port and run io_loop""" 84 if self.port: 85 self.listen(self.port) 86 log.info('listening on port: %s', self.port) 87 else: 88 log.warning("No 'port' in settings") 89 90 if self.settings.get('debug') is True: 91 log.info('running in debug mode') 92 93 await asyncio.Event().wait() 94 95 def run(self): # pragma: no cover 96 try: 97 asyncio.get_event_loop().run_until_complete(self._run_()) 98 except (KeyboardInterrupt, SystemExit): 99 # graceful shutdown 100 pass 101 102 async def on_line(self, handler: Websocket): 103 """called when a websocket opens""" 104 105 async def off_line(self, handler: Websocket): 106 """called when a websocket closes""" 107 108 def to_json(self, value: dict) -> str: 109 """called by websocket and webhandler""" 110 return self.settings.get('json_encoder', json_utils.dumps)(value)
A minimal sub-class of tornado.web.Application
30 def __init__( 31 self, 32 routes: Optional[List] = None, 33 port: Optional[int] = None, 34 **kwargs, 35 ): 36 self.port: int = port if port else int(os.getenv('PORT', options.port)) 37 if 'debug' not in kwargs: 38 kwargs['debug'] = options.debug 39 40 # WebSocket state per-instance (not shared across apps) 41 self.ws_clients = defaultdict(list) 42 self.ws_tasks = set() 43 44 super().__init__(routes, **kwargs) 45 web.install(self) 46 47 # Warn about debug mode security implications 48 if self.settings.get('debug') is True: 49 log.warning( 50 '⚠️ DEBUG MODE ENABLED - CORS HEADERS ARE WIDE OPEN AND ' 51 'WEBSOCKET ORIGIN CHECKING IS DISABLED. DO NOT USE IN PRODUCTION.' 52 )
54 async def perform(self, handler, func: Callable, *args, **kwargs) -> Any: 55 """await a function or call in a thread_pool, better yet call redis""" 56 if inspect.iscoroutinefunction(func): 57 log.debug('aperform: %s', func) 58 with context.gust(handler): 59 result = await func(*args, **kwargs) 60 else: 61 log.debug('perform: %s', func) 62 partial = functools.partial( 63 self.call_in_context, handler, func, args, kwargs 64 ) 65 result = await asyncio.to_thread(partial) 66 if result: 67 if inspect.isawaitable(result): 68 result = await result 69 if inspect.isasyncgen(result): 70 result = handler.create_stream(result) 71 return result
await a function or call in a thread_pool, better yet call redis
73 def call_in_context(self, handler, func, args, kwargs): 74 """set the context and call function""" 75 with context.gust(handler): 76 return func(*args, **kwargs)
set the context and call function
78 def broadcast(self, path, message, client_ids=None): 79 """Broadcast to WebSocket clients at a path""" 80 Websocket.broadcast(self.ws_clients, path, message, client_ids)
Broadcast to WebSocket clients at a path
14class Routes: 15 """to organise our decorators""" 16 17 def __init__(self) -> None: 18 self.route_map = {} 19 self.broadcaster: Websocket 20 21 def get( 22 self, 23 path, 24 template=None, 25 auth=False, 26 ): 27 """wrap a GET""" 28 return self.default_wrap( 29 method='get', path=path, template=template, auth=auth 30 ) 31 32 def post( 33 self, 34 path, 35 template=None, 36 auth=False, 37 ): 38 """wrap a POST""" 39 return self.default_wrap( 40 method='post', path=path, template=template, auth=auth 41 ) 42 43 def put( 44 self, 45 path, 46 template=None, 47 auth=False, 48 ): 49 """wrap a PUT""" 50 return self.default_wrap( 51 method='put', path=path, template=template, auth=auth 52 ) 53 54 def delete( 55 self, 56 path, 57 template=None, 58 auth=False, 59 ): 60 """wrap a DELETE""" 61 return self.default_wrap( 62 method='delete', path=path, template=template, auth=auth 63 ) 64 65 def head( 66 self, 67 path, 68 template=None, 69 auth=False, 70 ): 71 """wrap a HEAD""" 72 return self.default_wrap( 73 method='head', path=path, template=template, auth=auth 74 ) 75 76 def ws_open( 77 self, 78 path, 79 auth=False, 80 ): 81 """wrap an on_open""" 82 return self.default_wrap( 83 method='ws_open', path=path, template=None, auth=auth 84 ) 85 86 def ws_message( 87 self, 88 path, 89 auth=False, 90 ): 91 """wrap an on_message""" 92 return self.default_wrap( 93 method='ws_message', path=path, template=None, auth=auth 94 ) 95 96 def ws_close( 97 self, 98 path, 99 auth=False, 100 ): 101 """wrap an on_close""" 102 return self.default_wrap( 103 method='ws_close', path=path, template=None, auth=auth 104 ) 105 106 def ws_json_rpc( 107 self, 108 path, 109 auth=False, 110 handler=None, 111 ): 112 """ 113 Wrap a json remote procedure endpoint. 114 115 Can be used in two ways: 116 1. With a function decorator: 117 @web.ws_json_rpc('/api') 118 async def my_rpc(method: str, params=None): 119 return await handler.call(method, params) 120 121 2. With a handler object: 122 web.ws_json_rpc('/api', handler=my_handler) 123 # Handler must implement: async def call(method, params) 124 """ 125 return self.default_wrap( 126 method='ws_rpc', 127 path=path, 128 template=None, 129 auth=auth, 130 handler=handler, 131 ) 132 133 def broadcast(self, path: str, message: str, client_ids: List[int] = None): 134 """broadcast to a path""" 135 self.broadcaster.broadcast(path, message, client_ids) 136 137 def default_wrap( 138 self, 139 method, 140 path, 141 template=None, 142 auth=False, 143 handler=None, 144 ): 145 """wrap a method""" 146 147 # If handler is provided directly (no decorator on function) 148 if handler is not None: 149 cfg = self.route_map.setdefault(path, WsConfig()) 150 cfg.ws_rpc_handler = handler 151 if auth is True: 152 cfg.auth = True 153 return handler 154 155 def inner_decorator(func): 156 """we have the function""" 157 web_method = WebMethod(func=func, template=template, auth=auth) 158 if method.startswith('ws_'): 159 cfg = self.route_map.setdefault(path, WsConfig()) 160 if method.startswith('ws_rpc'): 161 cfg.ws_rpc[func.__name__] = web_method 162 else: 163 setattr(cfg, method, web_method) 164 if auth is True: 165 cfg.auth = True 166 else: 167 cfg = self.route_map.setdefault(path, WebConfig()) 168 setattr(cfg, method, web_method) 169 return func 170 171 return inner_decorator 172 173 def install(self, app): 174 routes = [] 175 for path, cfg in self.route_map.items(): 176 if isinstance(cfg, WebConfig): 177 handler = WebHandler 178 init_dict = {'method_settings': cfg} 179 else: 180 handler = Websocket 181 init_dict = { 182 'method_settings': cfg, 183 'ws_clients': app.ws_clients, 184 'ws_tasks': app.ws_tasks, 185 } 186 routes.append((rf'{path}', handler, init_dict)) 187 routes.sort(reverse=True) 188 self.broadcaster = app 189 app.add_handlers('.*', routes) 190 return routes
to organise our decorators
21 def get( 22 self, 23 path, 24 template=None, 25 auth=False, 26 ): 27 """wrap a GET""" 28 return self.default_wrap( 29 method='get', path=path, template=template, auth=auth 30 )
wrap a GET
32 def post( 33 self, 34 path, 35 template=None, 36 auth=False, 37 ): 38 """wrap a POST""" 39 return self.default_wrap( 40 method='post', path=path, template=template, auth=auth 41 )
wrap a POST
43 def put( 44 self, 45 path, 46 template=None, 47 auth=False, 48 ): 49 """wrap a PUT""" 50 return self.default_wrap( 51 method='put', path=path, template=template, auth=auth 52 )
wrap a PUT
54 def delete( 55 self, 56 path, 57 template=None, 58 auth=False, 59 ): 60 """wrap a DELETE""" 61 return self.default_wrap( 62 method='delete', path=path, template=template, auth=auth 63 )
wrap a DELETE
65 def head( 66 self, 67 path, 68 template=None, 69 auth=False, 70 ): 71 """wrap a HEAD""" 72 return self.default_wrap( 73 method='head', path=path, template=template, auth=auth 74 )
wrap a HEAD
76 def ws_open( 77 self, 78 path, 79 auth=False, 80 ): 81 """wrap an on_open""" 82 return self.default_wrap( 83 method='ws_open', path=path, template=None, auth=auth 84 )
wrap an on_open
86 def ws_message( 87 self, 88 path, 89 auth=False, 90 ): 91 """wrap an on_message""" 92 return self.default_wrap( 93 method='ws_message', path=path, template=None, auth=auth 94 )
wrap an on_message
96 def ws_close( 97 self, 98 path, 99 auth=False, 100 ): 101 """wrap an on_close""" 102 return self.default_wrap( 103 method='ws_close', path=path, template=None, auth=auth 104 )
wrap an on_close
106 def ws_json_rpc( 107 self, 108 path, 109 auth=False, 110 handler=None, 111 ): 112 """ 113 Wrap a json remote procedure endpoint. 114 115 Can be used in two ways: 116 1. With a function decorator: 117 @web.ws_json_rpc('/api') 118 async def my_rpc(method: str, params=None): 119 return await handler.call(method, params) 120 121 2. With a handler object: 122 web.ws_json_rpc('/api', handler=my_handler) 123 # Handler must implement: async def call(method, params) 124 """ 125 return self.default_wrap( 126 method='ws_rpc', 127 path=path, 128 template=None, 129 auth=auth, 130 handler=handler, 131 )
Wrap a json remote procedure endpoint.
Can be used in two ways:
- With a function decorator: @web.ws_json_rpc('/api') async def my_rpc(method: str, params=None): return await handler.call(method, params)
- With a handler object: web.ws_json_rpc('/api', handler=my_handler) # Handler must implement: async def call(method, params)
133 def broadcast(self, path: str, message: str, client_ids: List[int] = None): 134 """broadcast to a path""" 135 self.broadcaster.broadcast(path, message, client_ids)
broadcast to a path
137 def default_wrap( 138 self, 139 method, 140 path, 141 template=None, 142 auth=False, 143 handler=None, 144 ): 145 """wrap a method""" 146 147 # If handler is provided directly (no decorator on function) 148 if handler is not None: 149 cfg = self.route_map.setdefault(path, WsConfig()) 150 cfg.ws_rpc_handler = handler 151 if auth is True: 152 cfg.auth = True 153 return handler 154 155 def inner_decorator(func): 156 """we have the function""" 157 web_method = WebMethod(func=func, template=template, auth=auth) 158 if method.startswith('ws_'): 159 cfg = self.route_map.setdefault(path, WsConfig()) 160 if method.startswith('ws_rpc'): 161 cfg.ws_rpc[func.__name__] = web_method 162 else: 163 setattr(cfg, method, web_method) 164 if auth is True: 165 cfg.auth = True 166 else: 167 cfg = self.route_map.setdefault(path, WebConfig()) 168 setattr(cfg, method, web_method) 169 return func 170 171 return inner_decorator
wrap a method
173 def install(self, app): 174 routes = [] 175 for path, cfg in self.route_map.items(): 176 if isinstance(cfg, WebConfig): 177 handler = WebHandler 178 init_dict = {'method_settings': cfg} 179 else: 180 handler = Websocket 181 init_dict = { 182 'method_settings': cfg, 183 'ws_clients': app.ws_clients, 184 'ws_tasks': app.ws_tasks, 185 } 186 routes.append((rf'{path}', handler, init_dict)) 187 routes.sort(reverse=True) 188 self.broadcaster = app 189 app.add_handlers('.*', routes) 190 return routes
18class Redirect(Exception): 19 """redirect to url and if cookie set it""" 20 21 def __init__(self, url='/'): 22 self.url = url 23 super().__init__()
redirect to url and if cookie set it
112def stream(func): 113 """Decorator to stream results""" 114 115 @wraps(func) 116 async def wrapper(*args, **kwargs): 117 """Wrapper""" 118 if kwargs.get('stream_id', False): 119 stream_id = kwargs.pop('stream_id') 120 else: 121 stream_id = args[0] 122 args = args[1:] 123 log.debug('stream: %s %s %s - %s', func, args, kwargs, stream_id) 124 yield stream_id 125 async for item in func(*args, **kwargs): 126 yield item 127 128 return wrapper
Decorator to stream results
6class Stream: 7 """A special instance of a result after a result""" 8 9 def __init__(self, gen: AsyncGenerator) -> None: 10 self.id = str(uuid.uuid4()) 11 self.gen = gen 12 13 def to_json(self) -> dict: 14 """We're just a response""" 15 return {'stream_id': self.id}
A special instance of a result after a result
get the current handler
11class AuthStaticFileHandler(UserMixin, StaticFileHandler): 12 """ 13 This provide integration between tornado.web.authenticated 14 and tornado.web.StaticFileHandler. 15 16 It assumes you have set up the cookie name in the application 17 settings and that the request already has the cookie set. In 18 other words the user has already authenticated. 19 """ 20 21 def initialize(self, allow: list = None, **kwargs): 22 """allow some paths through""" 23 super().initialize(**kwargs) 24 self.allow = allow if allow else [] 25 26 def get(self, path, include_body=True): 27 """safe to return what you need""" 28 if self.current_user is None and path not in self.allow: 29 self.not_authenticated() 30 return StaticFileHandler.get(self, path, include_body)
This provide integration between tornado.web.authenticated and tornado.web.StaticFileHandler.
It assumes you have set up the cookie name in the application settings and that the request already has the cookie set. In other words the user has already authenticated.
21 def initialize(self, allow: list = None, **kwargs): 22 """allow some paths through""" 23 super().initialize(**kwargs) 24 self.allow = allow if allow else []
Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a URLSpec will be
supplied as keyword arguments to initialize().
Example::
class ProfileHandler(RequestHandler):
def initialize(self, database):
self.database = database
def get(self, username):
...
app = Application([
(r'/user/(.*)', ProfileHandler, dict(database=database)),
])
53class PostgresRPC: 54 """ 55 Call PostgreSQL stored functions via JSON-RPC with automatic 56 parameter marshalling from JSON-RPC format to PostgreSQL format. 57 58 Supports both positional (array) and named (object) parameters. 59 Caches function signatures to avoid repeated schema queries. 60 """ 61 62 def __init__( 63 self, 64 connection, 65 schema: str = 'public', 66 ): 67 """ 68 Initialize PostgreSQL RPC handler. 69 70 Args: 71 connection: psycopg AsyncConnection or AsyncConnectionPool. 72 Must support async context managers and cursor operations. 73 schema: PostgreSQL schema to search for functions (default: 'public') 74 """ 75 self.connection = connection 76 self.schema = schema 77 78 async def call( 79 self, 80 method: str, 81 params: Union[List, Dict, None] = None, 82 ) -> Any: 83 """ 84 Call a PostgreSQL stored function. 85 86 When params is a dict (named parameters), the function signature is 87 queried to determine the correct parameter order, then params are 88 marshalled to positional arguments. 89 90 When params is a list (positional parameters), they are used as-is. 91 92 Args: 93 method: PostgreSQL function name (must not start with '_') 94 params: JSON-RPC params as list (positional) or dict (named), 95 or None for no parameters 96 97 Returns: 98 The first column of the first row returned by the function 99 100 Raises: 101 ValueError: If function name starts with '_' (private convention) 102 ValueError: If function not found or parameters are invalid 103 Exception: If PostgreSQL execution fails 104 """ 105 # Security: reject private functions (starting with underscore) 106 if method.startswith('_'): 107 raise ValueError(f'Cannot call private function: {method}') 108 109 # Normalize and validate params 110 if params is None: 111 params = [] 112 elif isinstance(params, dict): 113 # Named parameters: need to look up function signature 114 signature = await self._get_function_signature(method) 115 if not signature: 116 raise ValueError(f'Function not found: {method}') 117 118 # Reorder params according to function signature 119 positional_params = [] 120 for param_name in signature: 121 if param_name not in params: 122 raise ValueError( 123 f'Missing required parameter: {param_name} ' 124 f'for function {method}' 125 ) 126 positional_params.append(params[param_name]) 127 params = positional_params 128 elif not isinstance(params, list): 129 raise ValueError( 130 'Params must be list (positional) or dict (named)' 131 ) 132 133 # Build SQL: SELECT method(%s, %s, ...) 134 # Note: psycopg3 uses %s placeholders, not $1, $2 135 param_placeholders = ', '.join(['%s'] * len(params)) 136 sql = f'SELECT {self.schema}.{method}({param_placeholders})' 137 138 log.debug('PostgreSQL RPC: %s with params %r', sql, params) 139 140 # Execute function with error handling 141 try: 142 async with self.connection.cursor() as cur: 143 await cur.execute(sql, params) 144 result = await cur.fetchone() 145 # Return first column of first row, or None 146 return result[0] if result else None 147 except Exception as e: 148 # Rollback failed transaction to reset connection state 149 try: 150 await self.connection.rollback() 151 log.info('Transaction rolled back after error') 152 except Exception as rollback_err: 153 log.error('Failed to rollback transaction: %s', rollback_err) 154 # Re-raise the original error with details 155 raise 156 157 async def _get_function_signature( 158 self, function_name: str 159 ) -> Optional[List[str]]: 160 """ 161 Get function input parameter names in order. 162 163 Queries information_schema and caches the result to avoid 164 repeated schema lookups. 165 166 Args: 167 function_name: Name of the PostgreSQL function 168 169 Returns: 170 List of parameter names in order, or empty list if function 171 has no input parameters, or None if function not found 172 """ 173 cache_key = f'{self.schema}.{function_name}' 174 175 if cache_key in _FUNCTION_SIGNATURE_CACHE: 176 return _FUNCTION_SIGNATURE_CACHE[cache_key] 177 178 # Query information_schema for function signature 179 async with self.connection.cursor() as cur: 180 await cur.execute( 181 """ 182 SELECT array_agg(p.parameter_name ORDER BY p.ordinal_position)::text[] 183 FROM information_schema.parameters p 184 JOIN information_schema.routines r 185 ON r.specific_schema = p.specific_schema 186 AND r.specific_name = p.specific_name 187 WHERE r.routine_schema = %s 188 AND r.routine_name = %s 189 AND p.parameter_mode IN ('IN', 'INOUT') 190 """, 191 [self.schema, function_name], 192 ) 193 194 result = await cur.fetchone() 195 196 if not result or not result[0]: 197 # Function not found or has no input parameters 198 signature = [] 199 else: 200 signature = list(result[0]) 201 202 # Cache result 203 _FUNCTION_SIGNATURE_CACHE[cache_key] = signature 204 205 log.debug( 206 'Cached function signature: %s.%s -> %r', 207 self.schema, 208 function_name, 209 signature, 210 ) 211 212 return signature 213 214 @classmethod 215 def clear_cache(cls): 216 """Clear the global function signature cache.""" 217 _FUNCTION_SIGNATURE_CACHE.clear() 218 log.debug('Cleared PostgreSQL function signature cache')
Call PostgreSQL stored functions via JSON-RPC with automatic parameter marshalling from JSON-RPC format to PostgreSQL format.
Supports both positional (array) and named (object) parameters. Caches function signatures to avoid repeated schema queries.
62 def __init__( 63 self, 64 connection, 65 schema: str = 'public', 66 ): 67 """ 68 Initialize PostgreSQL RPC handler. 69 70 Args: 71 connection: psycopg AsyncConnection or AsyncConnectionPool. 72 Must support async context managers and cursor operations. 73 schema: PostgreSQL schema to search for functions (default: 'public') 74 """ 75 self.connection = connection 76 self.schema = schema
Initialize PostgreSQL RPC handler.
Args: connection: psycopg AsyncConnection or AsyncConnectionPool. Must support async context managers and cursor operations. schema: PostgreSQL schema to search for functions (default: 'public')
78 async def call( 79 self, 80 method: str, 81 params: Union[List, Dict, None] = None, 82 ) -> Any: 83 """ 84 Call a PostgreSQL stored function. 85 86 When params is a dict (named parameters), the function signature is 87 queried to determine the correct parameter order, then params are 88 marshalled to positional arguments. 89 90 When params is a list (positional parameters), they are used as-is. 91 92 Args: 93 method: PostgreSQL function name (must not start with '_') 94 params: JSON-RPC params as list (positional) or dict (named), 95 or None for no parameters 96 97 Returns: 98 The first column of the first row returned by the function 99 100 Raises: 101 ValueError: If function name starts with '_' (private convention) 102 ValueError: If function not found or parameters are invalid 103 Exception: If PostgreSQL execution fails 104 """ 105 # Security: reject private functions (starting with underscore) 106 if method.startswith('_'): 107 raise ValueError(f'Cannot call private function: {method}') 108 109 # Normalize and validate params 110 if params is None: 111 params = [] 112 elif isinstance(params, dict): 113 # Named parameters: need to look up function signature 114 signature = await self._get_function_signature(method) 115 if not signature: 116 raise ValueError(f'Function not found: {method}') 117 118 # Reorder params according to function signature 119 positional_params = [] 120 for param_name in signature: 121 if param_name not in params: 122 raise ValueError( 123 f'Missing required parameter: {param_name} ' 124 f'for function {method}' 125 ) 126 positional_params.append(params[param_name]) 127 params = positional_params 128 elif not isinstance(params, list): 129 raise ValueError( 130 'Params must be list (positional) or dict (named)' 131 ) 132 133 # Build SQL: SELECT method(%s, %s, ...) 134 # Note: psycopg3 uses %s placeholders, not $1, $2 135 param_placeholders = ', '.join(['%s'] * len(params)) 136 sql = f'SELECT {self.schema}.{method}({param_placeholders})' 137 138 log.debug('PostgreSQL RPC: %s with params %r', sql, params) 139 140 # Execute function with error handling 141 try: 142 async with self.connection.cursor() as cur: 143 await cur.execute(sql, params) 144 result = await cur.fetchone() 145 # Return first column of first row, or None 146 return result[0] if result else None 147 except Exception as e: 148 # Rollback failed transaction to reset connection state 149 try: 150 await self.connection.rollback() 151 log.info('Transaction rolled back after error') 152 except Exception as rollback_err: 153 log.error('Failed to rollback transaction: %s', rollback_err) 154 # Re-raise the original error with details 155 raise
Call a PostgreSQL stored function.
When params is a dict (named parameters), the function signature is queried to determine the correct parameter order, then params are marshalled to positional arguments.
When params is a list (positional parameters), they are used as-is.
Args: method: PostgreSQL function name (must not start with '_') params: JSON-RPC params as list (positional) or dict (named), or None for no parameters
Returns: The first column of the first row returned by the function
Raises: ValueError: If function name starts with '_' (private convention) ValueError: If function not found or parameters are invalid Exception: If PostgreSQL execution fails
221class AuthPostgresRPC(PostgresRPC): 222 """ 223 Authenticated PostgreSQL RPC handler that automatically injects 224 the current user as the first parameter to all function calls. 225 226 This is useful for: 227 - Row-level security (functions filter by user) 228 - Audit trails (functions can log who made the change) 229 - Multi-tenant systems (functions can enforce tenant isolation) 230 231 The current user is obtained from the request context and must be 232 available via context.get_current_user(). 233 234 Example PostgreSQL function signature: 235 CREATE FUNCTION get_user_orders(current_user_id INT, ...) 236 RETURNS TABLE(...) 237 """ 238 239 async def call( 240 self, 241 method: str, 242 params: Union[List, Dict, None] = None, 243 require_auth: bool = True, 244 ) -> Any: 245 """ 246 Call a PostgreSQL stored function with automatic user injection. 247 248 The current user from the request context is automatically prepended 249 as the first parameter. 250 251 Args: 252 method: PostgreSQL function name (must not start with '_') 253 params: JSON-RPC params as list (positional) or dict (named), 254 or None for no parameters beyond the user 255 require_auth: If True, raise ValueError if no current user. 256 If False, pass None as user if not authenticated. 257 258 Returns: 259 The first column of the first row returned by the function 260 261 Raises: 262 ValueError: If require_auth is True and no current user exists 263 ValueError: If function name starts with '_' (private convention) 264 Exception: If PostgreSQL execution fails 265 """ 266 # Get current user from context 267 current_user = context.get_current_user() 268 269 if current_user is None and require_auth: 270 raise ValueError('Authentication required: no current user') 271 272 # Prepare params with user prepended 273 if params is None: 274 params_with_user = [current_user] 275 elif isinstance(params, list): 276 # Positional params: prepend user to the list 277 params_with_user = [current_user] + params 278 elif isinstance(params, dict): 279 # Named params: prepend user and let parent handle reordering 280 # The user will be first in the function signature 281 params_with_user = {**params, '_user': current_user} 282 else: 283 raise ValueError( 284 'Params must be list (positional) or dict (named)' 285 ) 286 287 log.debug( 288 'AuthPostgresRPC: calling %s with user=%s, params=%r', 289 method, 290 current_user, 291 params, 292 ) 293 294 # Call parent with user-injected params 295 return await super().call(method, params_with_user)
Authenticated PostgreSQL RPC handler that automatically injects the current user as the first parameter to all function calls.
This is useful for:
- Row-level security (functions filter by user)
- Audit trails (functions can log who made the change)
- Multi-tenant systems (functions can enforce tenant isolation)
The current user is obtained from the request context and must be available via context.get_current_user().
Example PostgreSQL function signature: CREATE FUNCTION get_user_orders(current_user_id INT, ...) RETURNS TABLE(...)
239 async def call( 240 self, 241 method: str, 242 params: Union[List, Dict, None] = None, 243 require_auth: bool = True, 244 ) -> Any: 245 """ 246 Call a PostgreSQL stored function with automatic user injection. 247 248 The current user from the request context is automatically prepended 249 as the first parameter. 250 251 Args: 252 method: PostgreSQL function name (must not start with '_') 253 params: JSON-RPC params as list (positional) or dict (named), 254 or None for no parameters beyond the user 255 require_auth: If True, raise ValueError if no current user. 256 If False, pass None as user if not authenticated. 257 258 Returns: 259 The first column of the first row returned by the function 260 261 Raises: 262 ValueError: If require_auth is True and no current user exists 263 ValueError: If function name starts with '_' (private convention) 264 Exception: If PostgreSQL execution fails 265 """ 266 # Get current user from context 267 current_user = context.get_current_user() 268 269 if current_user is None and require_auth: 270 raise ValueError('Authentication required: no current user') 271 272 # Prepare params with user prepended 273 if params is None: 274 params_with_user = [current_user] 275 elif isinstance(params, list): 276 # Positional params: prepend user to the list 277 params_with_user = [current_user] + params 278 elif isinstance(params, dict): 279 # Named params: prepend user and let parent handle reordering 280 # The user will be first in the function signature 281 params_with_user = {**params, '_user': current_user} 282 else: 283 raise ValueError( 284 'Params must be list (positional) or dict (named)' 285 ) 286 287 log.debug( 288 'AuthPostgresRPC: calling %s with user=%s, params=%r', 289 method, 290 current_user, 291 params, 292 ) 293 294 # Call parent with user-injected params 295 return await super().call(method, params_with_user)
Call a PostgreSQL stored function with automatic user injection.
The current user from the request context is automatically prepended as the first parameter.
Args: method: PostgreSQL function name (must not start with '_') params: JSON-RPC params as list (positional) or dict (named), or None for no parameters beyond the user require_auth: If True, raise ValueError if no current user. If False, pass None as user if not authenticated.
Returns: The first column of the first row returned by the function
Raises: ValueError: If require_auth is True and no current user exists ValueError: If function name starts with '_' (private convention) Exception: If PostgreSQL execution fails