Server API Reference

Complete reference for PyFrame’s server and request handling APIs.

Development Server

SimpleDevServer

PyFrame’s built-in development server with hot reload capabilities.

1
2
3
4
5
6
7
8
9
from pyframe.server import SimpleDevServer

server = SimpleDevServer(
    app=app,
    host='localhost',
    port=3000,
    hot_reload=True
)
await server.start()

Constructor Parameters

app: PyFrameApp

The PyFrame application instance to serve.

host: str

Server host address. Default: 'localhost'

port: int

Server port number. Default: 3000

hot_reload: bool

Enable hot reload functionality. Default: True

debug: bool

Enable debug mode. Default: True

static_path: str

Path to static files directory. Default: 'static'

Methods

async start()

Start the development server.

Example:

1
2
server = SimpleDevServer(app)
await server.start()
async stop()

Stop the development server.

async restart()

Restart the server (useful for hot reload).

get_status() -> dict

Get server status information.

Returns:

  • dict - Server status including uptime, request count, etc.

Request Context

RequestContext

Contains information about the current HTTP request.

1
2
3
4
5
@app.route('/users/<user_id>')
async def get_user(context):
    user_id = context.params['user_id']
    query = context.query_params.get('include')
    return {'user_id': user_id, 'include': query}

Attributes

method: str

HTTP method (GET, POST, PUT, DELETE, etc.).

path: str

Request path without query parameters.

full_path: str

Full request path including query parameters.

params: dict

URL path parameters from route patterns.

query_params: dict

Query string parameters.

headers: dict

Request headers.

cookies: dict

Request cookies.

json: dict

Parsed JSON body (for POST/PUT requests).

form: dict

Parsed form data.

files: dict

Uploaded files.

raw_body: bytes

Raw request body.

user: User

Authenticated user (if available).

session: dict

User session data.

remote_addr: str

Client IP address.

user_agent: str

Client user agent string.

Methods

get_header(self, name, default=None) -> str

Get request header value.

Parameters:

  • name: str - Header name (case-insensitive)
  • default: Any - Default value if header not found

Returns:

  • str - Header value or default

Example:

1
2
content_type = context.get_header('Content-Type', 'text/html')
auth_header = context.get_header('Authorization')
get_cookie(self, name, default=None) -> str

Get cookie value.

Parameters:

  • name: str - Cookie name
  • default: Any - Default value if cookie not found

Returns:

  • str - Cookie value or default
get_query_param(self, name, default=None) -> str

Get query parameter value.

Parameters:

  • name: str - Parameter name
  • default: Any - Default value if parameter not found

Returns:

  • str - Parameter value or default
is_ajax(self) -> bool

Check if request is an AJAX request.

Returns:

  • bool - True if AJAX request
is_json(self) -> bool

Check if request content type is JSON.

Returns:

  • bool - True if JSON content type
accepts(self, content_type) -> bool

Check if client accepts specific content type.

Parameters:

  • content_type: str - MIME type to check

Returns:

  • bool - True if content type is accepted

Example:

1
2
3
4
if context.accepts('application/json'):
    return {'data': 'json response'}
else:
    return {'body': '<html>HTML response</html>'}

Response Builder

Creating Responses

Basic Response

1
2
3
4
5
6
7
@app.route('/hello')
async def hello(context):
    return {
        'status': 200,
        'headers': {'Content-Type': 'text/plain'},
        'body': 'Hello, World!'
    }

JSON Response

1
2
3
4
5
6
7
8
@app.route('/api/users')
async def get_users(context):
    users = await User.all()
    return {
        'status': 200,
        'headers': {'Content-Type': 'application/json'},
        'body': json.dumps([user.to_dict() for user in users])
    }

HTML Response

1
2
3
4
5
6
7
8
@app.route('/dashboard')
async def dashboard(context):
    dashboard_component = Dashboard(user=context.user)
    return {
        'status': 200,
        'headers': {'Content-Type': 'text/html'},
        'body': dashboard_component.render()
    }

Redirect Response

1
2
3
4
5
6
@app.route('/old-path')
async def redirect_old_path(context):
    return {
        'status': 302,
        'headers': {'Location': '/new-path'}
    }

File Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@app.route('/download/<filename>')
async def download_file(context):
    filename = context.params['filename']
    file_path = f'/uploads/{filename}'
    
    if not os.path.exists(file_path):
        return {'status': 404, 'body': 'File not found'}
    
    with open(file_path, 'rb') as f:
        content = f.read()
    
    return {
        'status': 200,
        'headers': {
            'Content-Type': 'application/octet-stream',
            'Content-Disposition': f'attachment; filename="{filename}"',
            'Content-Length': str(len(content))
        },
        'body': content
    }

Response Helpers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyframe.responses import JSONResponse, HTMLResponse, RedirectResponse, FileResponse

@app.route('/api/data')
async def api_data(context):
    data = {'message': 'Hello, API!'}
    return JSONResponse(data, status=200)

@app.route('/page')
async def page(context):
    html = '<h1>Welcome</h1>'
    return HTMLResponse(html)

@app.route('/redirect')
async def redirect(context):
    return RedirectResponse('/destination', status=302)

@app.route('/file')
async def serve_file(context):
    return FileResponse('/path/to/file.pdf', filename='download.pdf')

Middleware System

Middleware Interface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def my_middleware(context, call_next):
    """Custom middleware function"""
    
    # Pre-processing
    start_time = time.time()
    print(f"Request started: {context.path}")
    
    # Call next middleware/handler
    response = await call_next(context)
    
    # Post-processing
    duration = time.time() - start_time
    print(f"Request completed in {duration:.3f}s")
    
    # Modify response
    response.setdefault('headers', {})
    response['headers']['X-Response-Time'] = f"{duration:.3f}s"
    
    return response

# Register middleware
app.middleware.append(my_middleware)

Built-in Middleware

CORS Middleware

1
2
3
4
5
6
7
8
9
10
from pyframe.middleware import CORSMiddleware

cors_middleware = CORSMiddleware(
    allow_origins=['*'],
    allow_methods=['GET', 'POST', 'PUT', 'DELETE'],
    allow_headers=['*'],
    allow_credentials=True
)

app.middleware.append(cors_middleware)

Authentication Middleware

1
2
3
4
5
6
7
8
from pyframe.middleware import AuthMiddleware

auth_middleware = AuthMiddleware(
    secret_key='your-secret-key',
    exclude_paths=['/login', '/register', '/public']
)

app.middleware.append(auth_middleware)

Rate Limiting Middleware

1
2
3
4
5
6
7
8
from pyframe.middleware import RateLimitMiddleware

rate_limit_middleware = RateLimitMiddleware(
    requests_per_minute=60,
    burst_size=10
)

app.middleware.append(rate_limit_middleware)

Compression Middleware

1
2
3
4
5
6
7
8
from pyframe.middleware import CompressionMiddleware

compression_middleware = CompressionMiddleware(
    minimum_size=1024,  # Only compress responses larger than 1KB
    compression_level=6
)

app.middleware.append(compression_middleware)

Custom Middleware Examples

Request ID Middleware

1
2
3
4
5
6
7
8
9
10
11
12
13
import uuid

async def request_id_middleware(context, call_next):
    """Add unique request ID to each request"""
    request_id = str(uuid.uuid4())
    context.request_id = request_id
    
    response = await call_next(context)
    
    response.setdefault('headers', {})
    response['headers']['X-Request-ID'] = request_id
    
    return response

Security Headers Middleware

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def security_headers_middleware(context, call_next):
    """Add security headers to responses"""
    response = await call_next(context)
    
    security_headers = {
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'X-XSS-Protection': '1; mode=block',
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'Content-Security-Policy': "default-src 'self'"
    }
    
    response.setdefault('headers', {})
    response['headers'].update(security_headers)
    
    return response

Error Handling Middleware

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import traceback

async def error_handling_middleware(context, call_next):
    """Global error handling"""
    try:
        return await call_next(context)
    
    except ValueError as e:
        return {
            'status': 400,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps({'error': 'Bad Request', 'message': str(e)})
        }
    
    except PermissionError as e:
        return {
            'status': 403,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps({'error': 'Forbidden', 'message': str(e)})
        }
    
    except Exception as e:
        # Log error
        print(f"Unhandled error: {e}")
        print(traceback.format_exc())
        
        if context.app.debug:
            return {
                'status': 500,
                'headers': {'Content-Type': 'text/plain'},
                'body': traceback.format_exc()
            }
        else:
            return {
                'status': 500,
                'headers': {'Content-Type': 'application/json'},
                'body': json.dumps({'error': 'Internal Server Error'})
            }

WebSocket Support

WebSocket Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyframe.websockets import WebSocketManager

ws_manager = WebSocketManager()

@app.websocket('/ws/chat')
async def chat_websocket(websocket):
    """WebSocket endpoint for chat"""
    await ws_manager.connect(websocket)
    
    try:
        while True:
            # Receive message
            message = await websocket.receive_json()
            
            # Process message
            processed_message = {
                'user': message['user'],
                'text': message['text'],
                'timestamp': datetime.now().isoformat()
            }
            
            # Broadcast to all connected clients
            await ws_manager.broadcast(processed_message)
    
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await ws_manager.disconnect(websocket)

WebSocket Manager API

WebSocketManager

Manages WebSocket connections and messaging.

Methods
async connect(websocket)

Register a new WebSocket connection.

async disconnect(websocket)

Remove a WebSocket connection.

async broadcast(message)

Send message to all connected clients.

async send_to_user(user_id, message)

Send message to specific user.

async join_group(websocket, group_name)

Add connection to a group.

async leave_group(websocket, group_name)

Remove connection from a group.

async broadcast_to_group(group_name, message)

Send message to all connections in a group.

get_connection_count() -> int

Get total number of active connections.

get_group_size(group_name) -> int

Get number of connections in a group.


Server-Sent Events

SSE Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyframe.sse import SSEManager

sse_manager = SSEManager()

@app.route('/events')
async def event_stream(context):
    """Server-Sent Events endpoint"""
    
    async def event_generator():
        # Send initial connection event
        yield {
            'event': 'connected',
            'data': {'timestamp': datetime.now().isoformat()}
        }
        
        # Subscribe to events for this user
        user_id = context.user.id if context.user else 'anonymous'
        
        async for event in sse_manager.subscribe(f"user_{user_id}"):
            yield event
    
    return {
        'status': 200,
        'headers': {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'
        },
        'body': event_generator()
    }

# Send events from other parts of your application
await sse_manager.send_event('user_123', {
    'event': 'notification',
    'data': {'message': 'You have a new message'}
})

Static File Serving

Static File Configuration

1
2
3
4
5
6
7
8
9
10
# Serve static files from 'static' directory at '/static/' URL
app.add_static_path('/static/', 'static/')

# Serve uploaded files
app.add_static_path('/uploads/', '/var/uploads/')

# Serve with custom headers
app.add_static_path('/assets/', 'assets/', headers={
    'Cache-Control': 'public, max-age=31536000'
})

Custom Static File Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import mimetypes
import os
from pathlib import Path

async def custom_static_handler(context):
    """Custom static file handler with optimization"""
    
    # Get requested file path
    file_path = context.params.get('file_path', '')
    full_path = Path('static') / file_path
    
    # Security check - prevent directory traversal
    if not str(full_path.resolve()).startswith(str(Path('static').resolve())):
        return {'status': 403, 'body': 'Forbidden'}
    
    # Check if file exists
    if not full_path.exists() or not full_path.is_file():
        return {'status': 404, 'body': 'Not Found'}
    
    # Get file info
    file_stat = full_path.stat()
    file_size = file_stat.st_size
    file_mtime = file_stat.st_mtime
    
    # Check If-Modified-Since header
    if_modified_since = context.get_header('If-Modified-Since')
    if if_modified_since:
        # Compare with file modification time
        # Return 304 if not modified
        pass
    
    # Determine content type
    content_type, _ = mimetypes.guess_type(str(full_path))
    content_type = content_type or 'application/octet-stream'
    
    # Read file content
    with open(full_path, 'rb') as f:
        content = f.read()
    
    # Prepare response headers
    headers = {
        'Content-Type': content_type,
        'Content-Length': str(file_size),
        'Last-Modified': datetime.fromtimestamp(file_mtime).strftime('%a, %d %b %Y %H:%M:%S GMT'),
        'Cache-Control': 'public, max-age=3600'
    }
    
    return {
        'status': 200,
        'headers': headers,
        'body': content
    }

# Register custom static handler
app.route('/custom-static/<path:file_path>')(custom_static_handler)

Performance Monitoring

Server Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class ServerMetrics:
    """Collect server performance metrics"""
    
    def __init__(self):
        self.request_count = 0
        self.total_response_time = 0
        self.error_count = 0
        self.start_time = time.time()
    
    def record_request(self, response_time, status_code):
        """Record request metrics"""
        self.request_count += 1
        self.total_response_time += response_time
        
        if status_code >= 400:
            self.error_count += 1
    
    def get_stats(self):
        """Get current server statistics"""
        uptime = time.time() - self.start_time
        avg_response_time = (
            self.total_response_time / self.request_count 
            if self.request_count > 0 else 0
        )
        
        return {
            'uptime_seconds': uptime,
            'request_count': self.request_count,
            'error_count': self.error_count,
            'error_rate': self.error_count / self.request_count if self.request_count > 0 else 0,
            'avg_response_time': avg_response_time,
            'requests_per_second': self.request_count / uptime if uptime > 0 else 0
        }

# Global metrics instance
server_metrics = ServerMetrics()

# Metrics middleware
async def metrics_middleware(context, call_next):
    start_time = time.time()
    
    response = await call_next(context)
    
    response_time = time.time() - start_time
    status_code = response.get('status', 200)
    
    server_metrics.record_request(response_time, status_code)
    
    return response

# Metrics endpoint
@app.route('/admin/metrics')
async def get_metrics(context):
    stats = server_metrics.get_stats()
    return {
        'status': 200,
        'headers': {'Content-Type': 'application/json'},
        'body': json.dumps(stats)
    }

This completes the Server API reference. PyFrame provides a powerful and flexible server system for building modern web applications! 🚀