Real-Time Features
Real-Time Features
PyFrame provides built-in real-time capabilities through WebSockets, Server-Sent Events (SSE), and live data synchronization, making it easy to build interactive, collaborative applications.
🌐 WebSocket Integration
Basic WebSocket Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyframe import PyFrameApp
from pyframe.realtime import WebSocketManager
app = PyFrameApp()
ws_manager = WebSocketManager()
@app.websocket('/ws/chat')
async def chat_handler(websocket):
await ws_manager.connect(websocket)
try:
while True:
# Receive message from client
data = await websocket.receive_json()
# Process message
message = {
'user': data['user'],
'text': data['text'],
'timestamp': datetime.now().isoformat(),
'type': 'message'
}
# Broadcast to all connected clients
await ws_manager.broadcast(message)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await ws_manager.disconnect(websocket)
WebSocket with User Groups
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@app.websocket('/ws/room/<room_id>')
async def room_handler(websocket, room_id):
user_id = await authenticate_websocket(websocket)
if not user_id:
await websocket.close(code=4001, reason="Authentication required")
return
# Join room-specific group
await ws_manager.join_group(websocket, f"room_{room_id}")
try:
# Send room history to new user
history = await get_room_history(room_id)
await websocket.send_json({
'type': 'history',
'messages': history
})
# Notify others that user joined
await ws_manager.broadcast_to_group(f"room_{room_id}", {
'type': 'user_joined',
'user_id': user_id,
'message': f"User {user_id} joined the room"
}, exclude=websocket)
while True:
data = await websocket.receive_json()
# Save message to database
message = await save_message(room_id, user_id, data['text'])
# Broadcast to room members
await ws_manager.broadcast_to_group(f"room_{room_id}", {
'type': 'message',
'id': message.id,
'user_id': user_id,
'text': data['text'],
'timestamp': message.created_at.isoformat()
})
except Exception as e:
print(f"Room WebSocket error: {e}")
finally:
await ws_manager.leave_group(websocket, f"room_{room_id}")
await ws_manager.broadcast_to_group(f"room_{room_id}", {
'type': 'user_left',
'user_id': user_id,
'message': f"User {user_id} left the room"
})
📡 Server-Sent Events (SSE)
Basic SSE Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyframe.realtime import SSEManager
sse_manager = SSEManager()
@app.route('/events')
async def event_stream(context):
user_id = context.user.id
# Create SSE response
async def event_generator():
# Send initial connection event
yield {
'event': 'connected',
'data': {'user_id': user_id, 'timestamp': datetime.now().isoformat()}
}
# Subscribe to user-specific events
async for event in sse_manager.subscribe(f"user_{user_id}"):
yield event
return {
'status': 200,
'headers': {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
},
'body': event_generator()
}
# Send events from elsewhere in your application
async def notify_user(user_id, event_type, data):
await sse_manager.send_event(f"user_{user_id}", {
'event': event_type,
'data': data
})
# Example: Notify user of new message
await notify_user(123, 'new_message', {
'message_id': 456,
'sender': 'Alice',
'preview': 'Hello there!'
})
SSE with Filtering
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@app.route('/events/notifications')
async def notification_stream(context):
user_id = context.user.id
async def filtered_events():
async for event in sse_manager.subscribe(f"notifications_{user_id}"):
# Filter events based on user preferences
if should_send_notification(user_id, event):
yield {
'event': 'notification',
'data': event['data'],
'id': event.get('id', str(uuid.uuid4()))
}
return {
'status': 200,
'headers': {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
},
'body': filtered_events()
}
🔄 Live Data Synchronization
Database Change Streaming
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from pyframe.realtime import LiveSync
from pyframe import Model, Field, FieldType
class Post(Model):
title = Field(FieldType.STRING, max_length=200)
content = Field(FieldType.TEXT)
author_id = Field(FieldType.INTEGER)
published = Field(FieldType.BOOLEAN, default=False)
# Set up live sync
live_sync = LiveSync()
# Sync model changes to WebSocket clients
@live_sync.watch(Post)
async def sync_post_changes(action, instance, old_instance=None):
"""Automatically sync post changes to connected clients"""
event_data = {
'model': 'Post',
'action': action, # 'created', 'updated', 'deleted'
'instance': instance.to_dict() if instance else None,
'old_instance': old_instance.to_dict() if old_instance else None
}
if action == 'created':
# Notify all users of new post
await ws_manager.broadcast({
'type': 'post_created',
'post': instance.to_dict()
})
elif action == 'updated':
# Notify users following this post
followers = await get_post_followers(instance.id)
for follower_id in followers:
await ws_manager.send_to_user(follower_id, {
'type': 'post_updated',
'post': instance.to_dict(),
'changes': get_changes(old_instance, instance)
})
elif action == 'deleted':
# Notify all clients of deletion
await ws_manager.broadcast({
'type': 'post_deleted',
'post_id': old_instance.id
})
# Enable live sync
live_sync.start()
Real-Time Component Updates
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from pyframe import StatefulComponent
from pyframe.realtime import LiveComponent
class LivePostList(LiveComponent):
"""Component that automatically updates when posts change"""
def __init__(self, **props):
super().__init__(**props)
self.state = State({
'posts': [],
'loading': True,
'connected': False
})
async def component_did_mount(self):
# Load initial data
posts = await Post.filter(published=True).order_by('-created_at')
self.state.set_multiple({
'posts': [p.to_dict() for p in posts],
'loading': False
})
# Connect to live updates
await self.connect_live_updates()
async def connect_live_updates(self):
"""Connect to WebSocket for live updates"""
await self.ws_connect('/ws/posts')
self.state.update('connected', True)
async def handle_ws_message(self, message):
"""Handle incoming WebSocket messages"""
if message['type'] == 'post_created':
posts = self.state.get('posts')
posts.insert(0, message['post'])
self.state.update('posts', posts)
elif message['type'] == 'post_updated':
posts = self.state.get('posts')
for i, post in enumerate(posts):
if post['id'] == message['post']['id']:
posts[i] = message['post']
break
self.state.update('posts', posts)
elif message['type'] == 'post_deleted':
posts = self.state.get('posts')
posts = [p for p in posts if p['id'] != message['post_id']]
self.state.update('posts', posts)
def render(self):
posts = self.state.get('posts')
loading = self.state.get('loading')
connected = self.state.get('connected')
if loading:
return '<div class="loading">Loading posts...</div>'
status_indicator = '🟢' if connected else '🔴'
posts_html = ''.join([
f'''
<article class="post" data-id="{post['id']}">
<h3>{post['title']}</h3>
<p>{post['content'][:100]}...</p>
<small>By {post['author_name']} • {post['created_at']}</small>
</article>
'''
for post in posts
])
return f'''
<div class="live-post-list">
<div class="status">
{status_indicator} Live Updates
</div>
<div class="posts">
{posts_html or '<p>No posts yet.</p>'}
</div>
</div>
'''
🎯 Collaborative Features
Real-Time Collaborative Editing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from pyframe.realtime import CollaborativeDocument
class DocumentEditor(StatefulComponent):
"""Real-time collaborative document editor"""
def __init__(self, document_id, **props):
super().__init__(**props)
self.document_id = document_id
self.state = State({
'content': '',
'cursors': {},
'users': [],
'saving': False
})
self.collab_doc = CollaborativeDocument(document_id)
async def component_did_mount(self):
# Load document content
doc = await Document.get(self.document_id)
self.state.update('content', doc.content)
# Join collaborative session
await self.collab_doc.join(self.handle_doc_changes)
# Load current users
users = await self.collab_doc.get_active_users()
self.state.update('users', users)
async def handle_doc_changes(self, changes):
"""Handle incoming collaborative changes"""
if changes['type'] == 'content_change':
# Apply operational transformation
new_content = self.apply_ot_changes(
self.state.get('content'),
changes['operations']
)
self.state.update('content', new_content)
elif changes['type'] == 'cursor_update':
cursors = self.state.get('cursors')
cursors[changes['user_id']] = changes['position']
self.state.update('cursors', cursors)
elif changes['type'] == 'user_joined':
users = self.state.get('users')
users.append(changes['user'])
self.state.update('users', users)
elif changes['type'] == 'user_left':
users = self.state.get('users')
users = [u for u in users if u['id'] != changes['user_id']]
self.state.update('users', users)
# Remove user's cursor
cursors = self.state.get('cursors')
cursors.pop(changes['user_id'], None)
self.state.update('cursors', cursors)
async def handle_text_change(self, new_content, cursor_position):
"""Handle local text changes"""
old_content = self.state.get('content')
# Generate operational transformation operations
operations = self.generate_ot_operations(old_content, new_content)
# Send changes to other users
await self.collab_doc.send_changes({
'type': 'content_change',
'operations': operations,
'cursor_position': cursor_position
})
# Update local state
self.state.update('content', new_content)
# Auto-save after delay
self.schedule_save()
def schedule_save(self):
"""Schedule document save with debouncing"""
if hasattr(self, 'save_timer'):
self.save_timer.cancel()
self.save_timer = asyncio.create_task(self.auto_save())
async def auto_save(self):
"""Auto-save document after delay"""
await asyncio.sleep(2) # 2 second delay
self.state.update('saving', True)
try:
await Document.filter(id=self.document_id).update(
content=self.state.get('content'),
updated_at=datetime.now()
)
finally:
self.state.update('saving', False)
def render(self):
content = self.state.get('content')
users = self.state.get('users')
cursors = self.state.get('cursors')
saving = self.state.get('saving')
# Render user avatars
user_avatars = ''.join([
f'<img src="{user["avatar"]}" alt="{user["name"]}" title="{user["name"]}">'
for user in users
])
# Render cursor indicators
cursor_indicators = ''.join([
f'<span class="cursor" style="left: {pos}px;" data-user="{user_id}"></span>'
for user_id, pos in cursors.items()
])
save_indicator = '💾 Saving...' if saving else '✅ Saved'
return f'''
<div class="document-editor">
<div class="toolbar">
<div class="users">
{user_avatars}
<span class="user-count">{len(users)} online</span>
</div>
<div class="save-status">{save_indicator}</div>
</div>
<div class="editor-container">
<textarea
class="editor"
value="{content}"
oninput="this.component.handle_text_change(this.value, this.selectionStart)"
onselectionchange="this.component.handle_cursor_move(this.selectionStart)"
></textarea>
{cursor_indicators}
</div>
</div>
'''
Real-Time Notifications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from pyframe.realtime import NotificationManager
notification_manager = NotificationManager()
class NotificationSystem(StatefulComponent):
"""Real-time notification system"""
def __init__(self, **props):
super().__init__(**props)
self.state = State({
'notifications': [],
'unread_count': 0,
'show_panel': False
})
async def component_did_mount(self):
# Load existing notifications
notifications = await Notification.filter(
user_id=self.context.user.id,
created_at__gte=datetime.now() - timedelta(days=7)
).order_by('-created_at')
self.state.set_multiple({
'notifications': [n.to_dict() for n in notifications],
'unread_count': len([n for n in notifications if not n.read])
})
# Connect to real-time notifications
await self.connect_notifications()
async def connect_notifications(self):
"""Connect to notification stream"""
user_id = self.context.user.id
async def handle_notification(notification):
notifications = self.state.get('notifications')
notifications.insert(0, notification)
# Keep only recent notifications
notifications = notifications[:50]
unread_count = self.state.get('unread_count')
if not notification['read']:
unread_count += 1
self.state.set_multiple({
'notifications': notifications,
'unread_count': unread_count
})
# Show browser notification if permission granted
self.show_browser_notification(notification)
await notification_manager.subscribe(f"user_{user_id}", handle_notification)
def show_browser_notification(self, notification):
"""Show browser notification"""
self.execute_js(f'''
if (Notification.permission === "granted") {{
new Notification("{notification['title']}", {{
body: "{notification['message']}",
icon: "/static/notification-icon.png"
}});
}}
''')
async def mark_as_read(self, notification_id):
"""Mark notification as read"""
await Notification.filter(id=notification_id).update(read=True)
notifications = self.state.get('notifications')
for notification in notifications:
if notification['id'] == notification_id:
notification['read'] = True
break
unread_count = self.state.get('unread_count')
self.state.set_multiple({
'notifications': notifications,
'unread_count': max(0, unread_count - 1)
})
def toggle_panel(self):
"""Toggle notification panel"""
show_panel = not self.state.get('show_panel')
self.state.update('show_panel', show_panel)
if show_panel:
# Mark all as read when panel is opened
self.mark_all_as_read()
async def mark_all_as_read(self):
"""Mark all notifications as read"""
user_id = self.context.user.id
await Notification.filter(user_id=user_id, read=False).update(read=True)
notifications = self.state.get('notifications')
for notification in notifications:
notification['read'] = True
self.state.set_multiple({
'notifications': notifications,
'unread_count': 0
})
def render(self):
notifications = self.state.get('notifications')
unread_count = self.state.get('unread_count')
show_panel = self.state.get('show_panel')
# Notification badge
badge = f'<span class="badge">{unread_count}</span>' if unread_count > 0 else ''
# Notification list
notification_items = ''.join([
f'''
<div class="notification-item {'unread' if not n['read'] else ''}"
onclick="this.component.mark_as_read({n['id']})">
<div class="notification-icon">{n['icon']}</div>
<div class="notification-content">
<h4>{n['title']}</h4>
<p>{n['message']}</p>
<small>{n['created_at']}</small>
</div>
</div>
'''
for n in notifications
])
panel_class = 'notification-panel active' if show_panel else 'notification-panel'
return f'''
<div class="notification-system">
<button class="notification-button" onclick="this.component.toggle_panel()">
🔔 {badge}
</button>
<div class="{panel_class}">
<div class="panel-header">
<h3>Notifications</h3>
{f'<button onclick="this.component.mark_all_as_read()">Mark all as read</button>' if unread_count > 0 else ''}
</div>
<div class="notification-list">
{notification_items or '<p class="empty">No notifications</p>'}
</div>
</div>
</div>
'''
# Usage: Send notifications from anywhere in your app
async def send_notification(user_id, title, message, icon="📢"):
notification = await Notification.create(
user_id=user_id,
title=title,
message=message,
icon=icon
)
await notification_manager.send(f"user_{user_id}", notification.to_dict())
⚡ Performance Optimization
Connection Pooling
1
2
3
4
5
6
7
8
9
10
11
from pyframe.realtime import ConnectionPool
# Configure connection pooling
connection_pool = ConnectionPool(
max_connections=1000,
cleanup_interval=60, # seconds
connection_timeout=300 # 5 minutes
)
# Use with WebSocket manager
ws_manager = WebSocketManager(connection_pool=connection_pool)
Message Queuing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyframe.realtime import MessageQueue
# Set up message queue for handling high-volume events
message_queue = MessageQueue(
backend='redis', # or 'memory', 'database'
max_queue_size=10000,
batch_size=50
)
@message_queue.handler('user_notifications')
async def process_notifications(messages):
"""Process notification messages in batches"""
for message in messages:
await send_notification(
message['user_id'],
message['title'],
message['content']
)
# Queue notifications instead of sending immediately
await message_queue.enqueue('user_notifications', {
'user_id': 123,
'title': 'New Message',
'content': 'You have a new message from Alice'
})
🔒 Security Considerations
WebSocket Authentication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def authenticate_websocket(websocket):
"""Authenticate WebSocket connections"""
try:
# Get token from query parameters or headers
token = websocket.query_params.get('token')
if not token:
return None
# Verify JWT token
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
user_id = payload['user_id']
# Verify user exists and is active
user = await User.get(id=user_id, is_active=True)
return user
except Exception:
return None
@app.websocket('/ws/secure')
async def secure_websocket(websocket):
user = await authenticate_websocket(websocket)
if not user:
await websocket.close(code=4001, reason="Authentication required")
return
# Continue with authenticated connection
await handle_authenticated_websocket(websocket, user)
Rate Limiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyframe.realtime import RateLimiter
rate_limiter = RateLimiter(
max_requests=100, # per window
window_size=60 # seconds
)
@app.websocket('/ws/chat')
async def rate_limited_chat(websocket):
user = await authenticate_websocket(websocket)
try:
while True:
message = await websocket.receive_json()
# Check rate limit
if not await rate_limiter.allow(f"user_{user.id}"):
await websocket.send_json({
'error': 'Rate limit exceeded',
'retry_after': 60
})
continue
# Process message
await process_chat_message(user, message)
except Exception as e:
print(f"Chat error: {e}")
finally:
await ws_manager.disconnect(websocket)
📚 Best Practices
1. Connection Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Good: Properly handle connection lifecycle
class WebSocketHandler:
def __init__(self):
self.connections = set()
async def connect(self, websocket):
self.connections.add(websocket)
await self.send_welcome(websocket)
async def disconnect(self, websocket):
self.connections.discard(websocket)
await self.cleanup_user_data(websocket)
async def broadcast(self, message):
# Remove dead connections
dead_connections = set()
for websocket in self.connections:
try:
await websocket.send_json(message)
except Exception:
dead_connections.add(websocket)
self.connections -= dead_connections
2. Error Handling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@app.websocket('/ws/chat')
async def robust_chat(websocket):
try:
await ws_manager.connect(websocket)
while True:
try:
message = await asyncio.wait_for(
websocket.receive_json(),
timeout=30.0 # 30 second timeout
)
await process_message(message)
except asyncio.TimeoutError:
# Send ping to keep connection alive
await websocket.ping()
except json.JSONDecodeError:
await websocket.send_json({
'error': 'Invalid JSON format'
})
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
await ws_manager.disconnect(websocket)
3. Scalability
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Use Redis for scaling across multiple servers
from pyframe.realtime import RedisBackend
redis_backend = RedisBackend(
host='localhost',
port=6379,
db=0
)
ws_manager = WebSocketManager(backend=redis_backend)
# Messages will be synchronized across all server instances
await ws_manager.broadcast({
'type': 'announcement',
'message': 'System maintenance in 5 minutes'
})
Real-time features are what make modern web applications feel alive and engaging. PyFrame makes it easy to add these capabilities to your applications! 🚀