Performance Optimization

Optimize your PyFrame applications for maximum speed, efficiency, and scalability. This guide covers all aspects of performance optimization.

📊 Performance Fundamentals

Understanding Performance Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import time
import psutil
import asyncio
from functools import wraps

class PerformanceMonitor:
    """Monitor application performance metrics"""
    
    def __init__(self):
        self.metrics = {
            'requests_processed': 0,
            'total_response_time': 0,
            'avg_response_time': 0,
            'memory_usage': 0,
            'cpu_usage': 0
        }
    
    def track_request(self, response_time):
        """Track request response time"""
        self.metrics['requests_processed'] += 1
        self.metrics['total_response_time'] += response_time
        self.metrics['avg_response_time'] = (
            self.metrics['total_response_time'] / 
            self.metrics['requests_processed']
        )
    
    def track_resources(self):
        """Track system resource usage"""
        process = psutil.Process()
        self.metrics['memory_usage'] = process.memory_info().rss / 1024 / 1024  # MB
        self.metrics['cpu_usage'] = process.cpu_percent()
    
    def get_metrics(self):
        """Get current performance metrics"""
        self.track_resources()
        return self.metrics.copy()

# Global performance monitor
perf_monitor = PerformanceMonitor()

def performance_middleware(app):
    """Middleware to track request performance"""
    
    async def middleware(context, call_next):
        start_time = time.time()
        
        response = await call_next(context)
        
        response_time = time.time() - start_time
        perf_monitor.track_request(response_time)
        
        # Add performance headers
        response.setdefault('headers', {})
        response['headers']['X-Response-Time'] = f"{response_time:.3f}s"
        
        return response
    
    return middleware

🚀 Component Optimization

Component Memoization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from functools import lru_cache
import json
import hashlib

class MemoizedComponent:
    """Base class for memoized components"""
    
    _render_cache = {}
    _cache_hits = 0
    _cache_misses = 0
    
    def __init__(self, **props):
        self.props = props
        self._memo_key = self._generate_memo_key()
    
    def _generate_memo_key(self):
        """Generate cache key from props"""
        props_str = json.dumps(self.props, sort_keys=True, default=str)
        return hashlib.md5(props_str.encode()).hexdigest()
    
    def render(self):
        """Memoized render method"""
        if self._memo_key in self._render_cache:
            self._cache_hits += 1
            return self._render_cache[self._memo_key]
        
        self._cache_misses += 1
        result = self._render_impl()
        
        # Cache management - limit cache size
        if len(self._render_cache) > 1000:
            # Remove oldest 20% of entries
            keys_to_remove = list(self._render_cache.keys())[:200]
            for key in keys_to_remove:
                del self._render_cache[key]
        
        self._render_cache[self._memo_key] = result
        return result
    
    def _render_impl(self):
        """Implement this method with actual render logic"""
        raise NotImplementedError
    
    @classmethod
    def get_cache_stats(cls):
        """Get cache performance statistics"""
        total = cls._cache_hits + cls._cache_misses
        hit_rate = cls._cache_hits / total if total > 0 else 0
        return {
            'hits': cls._cache_hits,
            'misses': cls._cache_misses,
            'hit_rate': hit_rate,
            'cache_size': len(cls._render_cache)
        }

# Usage
class ProductCard(MemoizedComponent):
    """Memoized product card component"""
    
    def _render_impl(self):
        product = self.props['product']
        show_description = self.props.get('show_description', True)
        
        description = (
            f'<p class="description">{product["description"]}</p>'
            if show_description else ''
        )
        
        return f'''
        <div class="product-card">
            <img src="{product['image']}" alt="{product['name']}">
            <h3>{product['name']}</h3>
            <span class="price">${product['price']}</span>
            {description}
        </div>
        '''

# Efficient list rendering
class ProductList(Component):
    def render(self):
        products = self.props['products']
        show_descriptions = self.props.get('show_descriptions', True)
        
        # Each product card is memoized
        product_cards = [
            ProductCard(
                product=product,
                show_description=show_descriptions
            ).render()
            for product in products
        ]
        
        return f'''
        <div class="product-list">
            {''.join(product_cards)}
        </div>
        '''

Virtual Scrolling for Large Lists

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class VirtualScrollList(StatefulComponent):
    """Virtual scrolling for large datasets"""
    
    def __init__(self, **props):
        super().__init__(**props)
        
        self.items = props['items']
        self.item_height = props.get('item_height', 50)
        self.container_height = props.get('container_height', 400)
        self.overscan = props.get('overscan', 5)  # Extra items to render
        self.render_item = props['render_item']
        
        self.state = State({
            'scroll_top': 0,
            'visible_start': 0,
            'visible_count': 0
        })
        
        self._update_visible_range()
    
    def _update_visible_range(self):
        """Calculate which items should be visible"""
        scroll_top = self.state.get('scroll_top')
        
        # Calculate visible range
        start_index = max(0, int(scroll_top / self.item_height) - self.overscan)
        visible_count = int(self.container_height / self.item_height) + (2 * self.overscan)
        
        self.state.set_multiple({
            'visible_start': start_index,
            'visible_count': min(visible_count, len(self.items) - start_index)
        })
    
    def handle_scroll(self, scroll_top):
        """Handle scroll events efficiently"""
        # Throttle scroll updates
        if abs(scroll_top - self.state.get('scroll_top')) > 10:
            self.state.update('scroll_top', scroll_top)
            self._update_visible_range()
    
    def render(self):
        visible_start = self.state.get('visible_start')
        visible_count = self.state.get('visible_count')
        visible_end = visible_start + visible_count
        
        # Calculate spacer heights
        top_spacer = visible_start * self.item_height
        bottom_spacer = (len(self.items) - visible_end) * self.item_height
        
        # Render only visible items
        visible_items = []
        for i in range(visible_start, min(visible_end, len(self.items))):
            item_html = self.render_item(self.items[i], i)
            visible_items.append(
                f'<div class="virtual-item" style="height: {self.item_height}px;" data-index="{i}">'
                f'{item_html}'
                f'</div>'
            )
        
        return f'''
        <div class="virtual-scroll-container" 
             style="height: {self.container_height}px; overflow-y: auto;"
             onscroll="this.component.handle_scroll(this.scrollTop)">
            
            <div class="virtual-spacer-top" style="height: {top_spacer}px;"></div>
            
            <div class="virtual-items">
                {''.join(visible_items)}
            </div>
            
            <div class="virtual-spacer-bottom" style="height: {bottom_spacer}px;"></div>
        </div>
        '''

# Usage with 100,000 items
large_dataset = [{'id': i, 'name': f'Item {i}'} for i in range(100000)]

def render_item(item, index):
    return f'<span>{item["name"]}</span>'

virtual_list = VirtualScrollList(
    items=large_dataset,
    item_height=40,
    container_height=600,
    render_item=render_item
)

Lazy Component Loading

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
from functools import wraps

class LazyComponent:
    """Lazy-loaded component with loading states"""
    
    def __init__(self, loader, fallback="Loading..."):
        self.loader = loader  # Function that returns component
        self.fallback = fallback
        self._component = None
        self._loading = False
        self._error = None
    
    async def load(self):
        """Load the component asynchronously"""
        if self._component or self._loading:
            return
        
        try:
            self._loading = True
            self._component = await self.loader()
        except Exception as e:
            self._error = str(e)
        finally:
            self._loading = False
    
    def render(self, **props):
        """Render component or fallback"""
        if self._error:
            return f'<div class="error">Error loading component: {self._error}</div>'
        
        if self._component:
            return self._component(**props).render()
        
        if not self._loading:
            # Start loading
            asyncio.create_task(self.load())
        
        return f'<div class="loading">{self.fallback}</div>'

# Component factory functions
async def load_chart_component():
    """Simulate loading heavy charting component"""
    await asyncio.sleep(0.1)  # Simulate network delay
    
    class ChartComponent(Component):
        def render(self):
            data = self.props.get('data', [])
            return f'<div class="chart">Chart with {len(data)} data points</div>'
    
    return ChartComponent

async def load_data_table():
    """Simulate loading data table component"""
    await asyncio.sleep(0.05)
    
    class DataTable(Component):
        def render(self):
            rows = self.props.get('rows', [])
            return f'<table class="data-table">Table with {len(rows)} rows</table>'
    
    return DataTable

# Create lazy components
LazyChart = LazyComponent(load_chart_component, "Loading chart...")
LazyTable = LazyComponent(load_data_table, "Loading table...")

class Dashboard(StatefulComponent):
    def __init__(self, **props):
        super().__init__(**props)
        self.state = State({'active_tab': 'overview'})
    
    def render(self):
        active_tab = self.state.get('active_tab')
        
        return f'''
        <div class="dashboard">
            <nav class="tab-nav">
                <button onclick="this.component.set_tab('overview')"
                        class="{'active' if active_tab == 'overview' else ''}">
                    Overview
                </button>
                <button onclick="this.component.set_tab('charts')"
                        class="{'active' if active_tab == 'charts' else ''}">
                    Charts
                </button>
                <button onclick="this.component.set_tab('data')"
                        class="{'active' if active_tab == 'data' else ''}">
                    Data
                </button>
            </nav>
            
            <div class="tab-content">
                {self.render_tab_content(active_tab)}
            </div>
        </div>
        '''
    
    def render_tab_content(self, tab):
        if tab == 'overview':
            return '<div class="overview">Dashboard Overview</div>'
        elif tab == 'charts':
            # Lazy load chart component
            return LazyChart.render(data=self.get_chart_data())
        elif tab == 'data':
            # Lazy load table component
            return LazyTable.render(rows=self.get_table_data())
    
    def set_tab(self, tab):
        self.state.update('active_tab', tab)

🗃️ Database Optimization

Query Optimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from pyframe import Model, Field, FieldType

class OptimizedUserQuery:
    """Optimized user data queries"""
    
    @staticmethod
    async def get_user_with_posts(user_id, limit=10):
        """Get user with their recent posts in one query"""
        # Use select_related to avoid N+1 queries
        user = await User.select_related('profile').get(id=user_id)
        
        # Get posts with eager loading
        posts = await (Post
                      .select_related('author')
                      .filter(author_id=user_id)
                      .order_by('-created_at')
                      .limit(limit))
        
        user._posts = posts  # Attach posts to user object
        return user
    
    @staticmethod
    async def get_users_with_post_counts():
        """Get users with their post counts using aggregation"""
        from pyframe.models import Count
        
        return await (User
                     .annotate(post_count=Count('posts'))
                     .order_by('-post_count'))
    
    @staticmethod
    async def search_users_optimized(query, page=1, page_size=20):
        """Optimized user search with pagination"""
        offset = (page - 1) * page_size
        
        # Use database full-text search if available
        users = await (User
                      .filter(name__icontains=query)
                      .select_related('profile')
                      .offset(offset)
                      .limit(page_size))
        
        # Get total count for pagination
        total_count = await User.filter(name__icontains=query).count()
        
        return {
            'users': users,
            'total_count': total_count,
            'page': page,
            'page_size': page_size,
            'total_pages': (total_count + page_size - 1) // page_size
        }

# Bulk operations for better performance
class BulkOperations:
    """Efficient bulk database operations"""
    
    @staticmethod
    async def bulk_create_users(user_data_list):
        """Create multiple users efficiently"""
        users = [User(**data) for data in user_data_list]
        return await User.bulk_create(users)
    
    @staticmethod
    async def bulk_update_user_status(user_ids, status):
        """Update multiple users at once"""
        return await User.filter(id__in=user_ids).update(status=status)
    
    @staticmethod
    async def bulk_delete_old_posts(days_old=30):
        """Delete old posts in batch"""
        cutoff_date = datetime.now() - timedelta(days=days_old)
        return await Post.filter(created_at__lt=cutoff_date).delete()

Connection Pooling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncpg
from pyframe.database import DatabasePool

class OptimizedDatabasePool:
    """Optimized database connection pool"""
    
    def __init__(self, database_url, **pool_config):
        self.database_url = database_url
        self.pool_config = {
            'min_size': 5,
            'max_size': 20,
            'max_queries': 50000,
            'max_inactive_connection_lifetime': 300,
            'command_timeout': 60,
            **pool_config
        }
        self.pool = None
    
    async def initialize(self):
        """Initialize connection pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            **self.pool_config
        )
    
    async def execute_query(self, query, *args):
        """Execute query with connection from pool"""
        async with self.pool.acquire() as connection:
            return await connection.fetch(query, *args)
    
    async def execute_transaction(self, queries):
        """Execute multiple queries in a transaction"""
        async with self.pool.acquire() as connection:
            async with connection.transaction():
                results = []
                for query, args in queries:
                    result = await connection.fetch(query, *args)
                    results.append(result)
                return results
    
    async def get_pool_stats(self):
        """Get connection pool statistics"""
        return {
            'size': self.pool.get_size(),
            'min_size': self.pool.get_min_size(),
            'max_size': self.pool.get_max_size(),
            'idle_size': self.pool.get_idle_size()
        }
    
    async def close(self):
        """Close all connections"""
        if self.pool:
            await self.pool.close()

🌐 HTTP and Caching Optimization

Response Caching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import hashlib
import pickle
from datetime import datetime, timedelta

class ResponseCache:
    """HTTP response caching system"""
    
    def __init__(self, backend='memory', default_ttl=300):
        self.backend = backend
        self.default_ttl = default_ttl
        self._memory_cache = {}
        
        if backend == 'redis':
            import redis
            self.redis = redis.Redis()
    
    def generate_cache_key(self, context):
        """Generate cache key from request context"""
        key_parts = [
            context.method,
            context.path,
            sorted(context.query_params.items()) if context.query_params else '',
            context.headers.get('Accept', ''),
            getattr(context.user, 'id', 'anonymous') if hasattr(context, 'user') else 'anonymous'
        ]
        
        key_string = '|'.join(str(part) for part in key_parts)
        return hashlib.md5(key_string.encode()).hexdigest()
    
    async def get(self, key):
        """Get cached response"""
        if self.backend == 'memory':
            cached = self._memory_cache.get(key)
            if cached and cached['expires'] > datetime.now():
                return cached['data']
            elif cached:
                # Expired, remove from cache
                del self._memory_cache[key]
        
        elif self.backend == 'redis':
            data = self.redis.get(key)
            if data:
                return pickle.loads(data)
        
        return None
    
    async def set(self, key, data, ttl=None):
        """Cache response data"""
        ttl = ttl or self.default_ttl
        
        if self.backend == 'memory':
            self._memory_cache[key] = {
                'data': data,
                'expires': datetime.now() + timedelta(seconds=ttl)
            }
            
            # Clean up expired entries periodically
            if len(self._memory_cache) > 1000:
                self._cleanup_expired()
        
        elif self.backend == 'redis':
            self.redis.setex(key, ttl, pickle.dumps(data))
    
    def _cleanup_expired(self):
        """Remove expired cache entries"""
        now = datetime.now()
        expired_keys = [
            key for key, value in self._memory_cache.items()
            if value['expires'] <= now
        ]
        for key in expired_keys:
            del self._memory_cache[key]

# Caching middleware
def create_cache_middleware(cache_config=None):
    """Create caching middleware"""
    cache = ResponseCache(**(cache_config or {}))
    
    async def cache_middleware(context, call_next):
        # Only cache GET requests
        if context.method != 'GET':
            return await call_next(context)
        
        # Check for cache control headers
        if context.headers.get('Cache-Control') == 'no-cache':
            return await call_next(context)
        
        # Generate cache key
        cache_key = cache.generate_cache_key(context)
        
        # Try to get cached response
        cached_response = await cache.get(cache_key)
        if cached_response:
            # Add cache hit header
            cached_response.setdefault('headers', {})
            cached_response['headers']['X-Cache'] = 'HIT'
            return cached_response
        
        # Process request
        response = await call_next(context)
        
        # Cache successful responses
        if response.get('status', 200) == 200:
            # Determine TTL from response headers
            ttl = 300  # Default 5 minutes
            if 'Cache-Control' in response.get('headers', {}):
                # Parse max-age from Cache-Control
                cache_control = response['headers']['Cache-Control']
                if 'max-age=' in cache_control:
                    ttl = int(cache_control.split('max-age=')[1].split(',')[0])
            
            await cache.set(cache_key, response, ttl)
            
            # Add cache miss header
            response.setdefault('headers', {})
            response['headers']['X-Cache'] = 'MISS'
        
        return response
    
    return cache_middleware

Static Asset Optimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gzip
import hashlib
from pathlib import Path

class StaticAssetOptimizer:
    """Optimize static assets for better performance"""
    
    def __init__(self, static_dir, optimize_config=None):
        self.static_dir = Path(static_dir)
        self.config = {
            'enable_gzip': True,
            'enable_etags': True,
            'cache_max_age': 31536000,  # 1 year
            'gzip_threshold': 1024,  # Gzip files larger than 1KB
            **optimize_config or {}
        }
        self._file_cache = {}
    
    async def serve_static_file(self, file_path, context):
        """Serve optimized static file"""
        full_path = self.static_dir / file_path
        
        if not full_path.exists():
            return {'status': 404, 'body': 'File not found'}
        
        # Get file info
        file_stat = full_path.stat()
        file_size = file_stat.st_size
        file_mtime = file_stat.st_mtime
        
        # Generate ETag
        if self.config['enable_etags']:
            etag = hashlib.md5(f"{file_path}{file_mtime}{file_size}".encode()).hexdigest()
            
            # Check if client has cached version
            if context.headers.get('If-None-Match') == etag:
                return {'status': 304}  # Not Modified
        
        # Read file content
        cache_key = f"{file_path}:{file_mtime}"
        if cache_key in self._file_cache:
            content = self._file_cache[cache_key]
        else:
            with open(full_path, 'rb') as f:
                content = f.read()
            self._file_cache[cache_key] = content
        
        # Determine content type
        content_type = self._get_content_type(full_path.suffix)
        
        # Prepare headers
        headers = {
            'Content-Type': content_type,
            'Cache-Control': f'public, max-age={self.config["cache_max_age"]}',
        }
        
        if self.config['enable_etags']:
            headers['ETag'] = etag
        
        # Apply gzip compression if beneficial
        if (self.config['enable_gzip'] and 
            file_size > self.config['gzip_threshold'] and
            'gzip' in context.headers.get('Accept-Encoding', '') and
            content_type.startswith(('text/', 'application/javascript', 'application/css'))):
            
            content = gzip.compress(content)
            headers['Content-Encoding'] = 'gzip'
        
        headers['Content-Length'] = str(len(content))
        
        return {
            'status': 200,
            'headers': headers,
            'body': content
        }
    
    def _get_content_type(self, file_extension):
        """Get content type from file extension"""
        content_types = {
            '.html': 'text/html',
            '.css': 'text/css',
            '.js': 'application/javascript',
            '.json': 'application/json',
            '.png': 'image/png',
            '.jpg': 'image/jpeg',
            '.jpeg': 'image/jpeg',
            '.gif': 'image/gif',
            '.svg': 'image/svg+xml',
            '.ico': 'image/x-icon',
            '.woff': 'font/woff',
            '.woff2': 'font/woff2',
            '.ttf': 'font/ttf',
        }
        return content_types.get(file_extension.lower(), 'application/octet-stream')

⚡ Application-Level Optimizations

Background Task Processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import asyncio
from queue import Queue
import concurrent.futures

class BackgroundTaskProcessor:
    """Process background tasks efficiently"""
    
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.task_queue = asyncio.Queue()
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        self.workers = []
        self.running = False
    
    async def start(self):
        """Start background task workers"""
        self.running = True
        for i in range(self.max_workers):
            worker = asyncio.create_task(self._worker(f"worker-{i}"))
            self.workers.append(worker)
    
    async def stop(self):
        """Stop background task workers"""
        self.running = False
        
        # Cancel all workers
        for worker in self.workers:
            worker.cancel()
        
        # Close thread pool
        self.thread_pool.shutdown(wait=True)
    
    async def _worker(self, name):
        """Background task worker"""
        while self.running:
            try:
                # Get task from queue with timeout
                task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
                
                # Process task
                await self._process_task(task)
                
                # Mark task as done
                self.task_queue.task_done()
            
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Worker {name} error: {e}")
    
    async def _process_task(self, task):
        """Process individual task"""
        task_type = task['type']
        task_data = task['data']
        
        if task_type == 'send_email':
            await self._send_email(task_data)
        elif task_type == 'process_image':
            await self._process_image(task_data)
        elif task_type == 'generate_report':
            await self._generate_report(task_data)
        elif task_type == 'cpu_intensive':
            # Run CPU-intensive task in thread pool
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                self.thread_pool,
                self._cpu_intensive_task,
                task_data
            )
    
    async def queue_task(self, task_type, data):
        """Queue a background task"""
        task = {'type': task_type, 'data': data}
        await self.task_queue.put(task)
    
    async def _send_email(self, email_data):
        """Send email asynchronously"""
        # Simulate email sending
        await asyncio.sleep(0.1)
        print(f"Email sent to {email_data['to']}")
    
    async def _process_image(self, image_data):
        """Process image asynchronously"""
        # Simulate image processing
        await asyncio.sleep(0.2)
        print(f"Processed image {image_data['filename']}")
    
    async def _generate_report(self, report_data):
        """Generate report asynchronously"""
        # Simulate report generation
        await asyncio.sleep(0.5)
        print(f"Generated report {report_data['type']}")
    
    def _cpu_intensive_task(self, data):
        """CPU-intensive task that runs in thread pool"""
        # Simulate CPU-intensive work
        import time
        time.sleep(0.1)
        return f"Processed {data}"

# Global task processor
task_processor = BackgroundTaskProcessor()

# Usage in routes
@app.route('/send-notification')
async def send_notification(context):
    user_id = context.json['user_id']
    message = context.json['message']
    
    # Queue background task instead of blocking
    await task_processor.queue_task('send_email', {
        'to': f'user_{user_id}@example.com',
        'subject': 'Notification',
        'body': message
    })
    
    return {'status': 200, 'message': 'Notification queued'}

Memory Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import gc
import weakref
from typing import Dict, Any
import psutil

class MemoryManager:
    """Monitor and manage application memory usage"""
    
    def __init__(self, max_memory_mb=1024):
        self.max_memory_mb = max_memory_mb
        self.tracked_objects = weakref.WeakSet()
        self.cache_size_limit = 1000
    
    def track_object(self, obj):
        """Track object for memory monitoring"""
        self.tracked_objects.add(obj)
    
    def get_memory_usage(self):
        """Get current memory usage"""
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            'rss_mb': memory_info.rss / 1024 / 1024,
            'vms_mb': memory_info.vms / 1024 / 1024,
            'percent': process.memory_percent(),
            'tracked_objects': len(self.tracked_objects)
        }
    
    def should_cleanup(self):
        """Check if memory cleanup is needed"""
        memory_usage = self.get_memory_usage()
        return memory_usage['rss_mb'] > self.max_memory_mb
    
    def cleanup_memory(self):
        """Perform memory cleanup"""
        # Force garbage collection
        collected = gc.collect()
        
        # Clear various caches
        self._clear_component_cache()
        self._clear_query_cache()
        
        memory_after = self.get_memory_usage()
        
        return {
            'objects_collected': collected,
            'memory_after_mb': memory_after['rss_mb']
        }
    
    def _clear_component_cache(self):
        """Clear component render cache"""
        from your_components import MemoizedComponent
        if hasattr(MemoizedComponent, '_render_cache'):
            cache_size = len(MemoizedComponent._render_cache)
            if cache_size > self.cache_size_limit:
                # Keep only most recent 50% of cache
                items = list(MemoizedComponent._render_cache.items())
                items_to_keep = items[cache_size // 2:]
                MemoizedComponent._render_cache = dict(items_to_keep)
    
    def _clear_query_cache(self):
        """Clear database query cache"""
        # Implementation depends on your ORM
        pass

# Memory monitoring middleware
memory_manager = MemoryManager(max_memory_mb=2048)

async def memory_monitoring_middleware(context, call_next):
    """Monitor memory usage per request"""
    
    response = await call_next(context)
    
    # Check memory usage after request
    if memory_manager.should_cleanup():
        cleanup_result = memory_manager.cleanup_memory()
        print(f"Memory cleanup: {cleanup_result}")
    
    return response

📈 Performance Monitoring

Application Performance Monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import time
import statistics
from collections import defaultdict, deque

class APMCollector:
    """Application Performance Monitoring collector"""
    
    def __init__(self, max_samples=1000):
        self.max_samples = max_samples
        self.metrics = defaultdict(lambda: deque(maxlen=max_samples))
        self.counters = defaultdict(int)
        self.start_time = time.time()
    
    def record_metric(self, name, value, tags=None):
        """Record a performance metric"""
        metric_key = f"{name}:{':'.join(tags or [])}"
        self.metrics[metric_key].append({
            'value': value,
            'timestamp': time.time()
        })
    
    def increment_counter(self, name, tags=None):
        """Increment a counter metric"""
        counter_key = f"{name}:{':'.join(tags or [])}"
        self.counters[counter_key] += 1
    
    def get_metric_stats(self, name, tags=None):
        """Get statistics for a metric"""
        metric_key = f"{name}:{':'.join(tags or [])}"
        values = [m['value'] for m in self.metrics[metric_key]]
        
        if not values:
            return None
        
        return {
            'count': len(values),
            'min': min(values),
            'max': max(values),
            'mean': statistics.mean(values),
            'median': statistics.median(values),
            'p95': self._percentile(values, 95),
            'p99': self._percentile(values, 99)
        }
    
    def _percentile(self, values, percentile):
        """Calculate percentile"""
        if not values:
            return 0
        
        sorted_values = sorted(values)
        index = int((percentile / 100) * len(sorted_values))
        return sorted_values[min(index, len(sorted_values) - 1)]
    
    def get_dashboard_data(self):
        """Get data for performance dashboard"""
        uptime = time.time() - self.start_time
        
        dashboard = {
            'uptime_seconds': uptime,
            'counters': dict(self.counters),
            'metrics': {}
        }
        
        for metric_name in self.metrics:
            dashboard['metrics'][metric_name] = self.get_metric_stats(metric_name)
        
        return dashboard

# Global APM instance
apm = APMCollector()

# Performance decorators
def monitor_performance(metric_name=None, tags=None):
    """Decorator to monitor function performance"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            name = metric_name or f"{func.__module__}.{func.__name__}"
            start_time = time.time()
            
            try:
                result = await func(*args, **kwargs)
                apm.increment_counter('function_calls', tags=['status:success'] + (tags or []))
                return result
            except Exception as e:
                apm.increment_counter('function_calls', tags=['status:error'] + (tags or []))
                raise
            finally:
                execution_time = time.time() - start_time
                apm.record_metric('execution_time', execution_time, tags=[f'function:{name}'] + (tags or []))
        
        return wrapper
    return decorator

# Usage
@monitor_performance('database.user_query', ['operation:select'])
async def get_user_by_id(user_id):
    # Database query implementation
    return await User.get(id=user_id)

# Performance dashboard route
@app.route('/admin/performance')
async def performance_dashboard(context):
    dashboard_data = apm.get_dashboard_data()
    
    return {
        'status': 200,
        'headers': {'Content-Type': 'application/json'},
        'body': json.dumps(dashboard_data)
    }

Follow these optimization techniques to build blazing-fast PyFrame applications that can handle high traffic and large datasets efficiently! ⚡