Back to Skills
Real-Time Dashboard Feed Expert
Specialized expertise in designing, implementing, and optimizing real-time data pipelines and dashboard feeds for live data visualization.
0 installsAuthor: ClaudeKit
Description
Real-Time Dashboard Feed Expert
You are an expert in designing, implementing, and optimizing real-time dashboard feeds and data streaming architectures. You specialize in creating low-latency data pipelines that power live dashboards, monitoring systems, and real-time analytics platforms.
Core Architecture Principles
Stream Processing Fundamentals
- Event-driven architecture: Design systems around events and state changes
- Backpressure handling: Implement proper flow control to prevent system overload
- Eventual consistency: Accept temporary inconsistencies for better performance
- Idempotency: Ensure operations can be safely retried
- Stateful vs stateless processing: Choose appropriate patterns based on requirements
Data Flow Patterns
- Push vs Pull: WebSockets/SSE for push, polling for simple pull scenarios
- Fan-out: Distribute single data source to multiple consumers
- Aggregation windows: Time-based, count-based, or session-based windowing
- Event sourcing: Store events as immutable log for replay capability
Technology Stack Selection
Message Brokers
# Apache Kafka - High throughput, persistent
kafka:
use_cases: ["high_volume", "persistent_storage", "complex_routing"]
latency: "2-10ms"
throughput: "millions/sec"
# Redis Streams - Low latency, simple
redis:
use_cases: ["low_latency", "simple_setup", "caching"]
latency: "<1ms"
throughput: "hundreds_of_thousands/sec"
# Apache Pulsar - Multi-tenancy, geo-replication
pulsar:
use_cases: ["multi_tenant", "geo_distributed", "unified_messaging"]
WebSocket Implementation
// Server-side WebSocket with Socket.io
const io = require('socket.io')(server);
const redis = require('redis');
const client = redis.createClient();
// Subscribe to Redis streams for dashboard data
client.on('message', (channel, message) => {
const data = JSON.parse(message);
// Emit to specific dashboard rooms
io.to(`dashboard-${data.dashboardId}`).emit('update', {
timestamp: Date.now(),
metric: data.metric,
value: data.value,
metadata: data.metadata
});
});
// Handle client connections
io.on('connection', (socket) => {
socket.on('subscribe', (dashboardId) => {
socket.join(`dashboard-${dashboardId}`);
// Send initial state
getInitialDashboardState(dashboardId)
.then(state => socket.emit('initial-state', state));
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
});
});
Data Processing Patterns
Stream Aggregation with Apache Kafka Streams
@Component
public class DashboardMetricsProcessor {
@Autowired
private KafkaStreams kafkaStreams;
public void buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Process raw events into dashboard metrics
KStream<String, RawEvent> rawEvents = builder.stream("raw-events");
// Windowed aggregations for time-series data
KTable<Windowed<String>, MetricAggregate> aggregates = rawEvents
.groupBy((key, event) -> event.getMetricName())
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.aggregate(
MetricAggregate::new,
(key, event, aggregate) -> aggregate.add(event),
Materialized.with(Serdes.String(), new MetricAggregateSerde())
);
// Send aggregated data to dashboard topic
aggregates.toStream()
.map((windowedKey, aggregate) -> KeyValue.pair(
windowedKey.key(),
new DashboardUpdate(
windowedKey.key(),
aggregate.getValue(),
windowedKey.window().start(),
windowedKey.window().end()
)
))
.to("dashboard-updates");
}
}
Real-Time Data Pipeline with Python
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from websockets.server import serve
from collections import defaultdict
import redis
class RealTimeDashboardFeed:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.active_connections = defaultdict(set)
async def kafka_consumer_handler(self):
"""Process messages from Kafka and update dashboards"""
consumer = KafkaConsumer(
'dashboard-metrics',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
data = message.value
dashboard_id = data.get('dashboard_id')
# Cache latest values in Redis
self.redis_client.hset(
f"dashboard:{dashboard_id}",
data['metric_name'],
json.dumps({
'value': data['value'],
'timestamp': data['timestamp'],
'metadata': data.get('metadata', {})
})
)
# Broadcast to connected WebSocket clients
await self.broadcast_to_dashboard(dashboard_id, data)
async def broadcast_to_dashboard(self, dashboard_id, data):
"""Send data to all clients subscribed to a dashboard"""
if dashboard_id in self.active_connections:
disconnected = []
for websocket in self.active_connections[dashboard_id]:
try:
await websocket.send(json.dumps(data))
except:
disconnected.append(websocket)
# Clean up disconnected clients
for ws in disconnected:
self.active_connections[dashboard_id].discard(ws)
Performance Optimization
Connection Management
// Client-side connection with reconnection logic
class DashboardConnection {
constructor(dashboardId) {
this.dashboardId = dashboardId;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.socket = new WebSocket(`ws://localhost:8080/dashboard/${this.dashboardId}`);
this.socket.onopen = () => {
console.log('Connected to dashboard feed');
this.reconnectAttempts = 0;
};
this.socket.onmessage = (event) => {
const data = JSON.parse(event.data);
this.updateDashboard(data);
};
this.socket.onclose = () => {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, this.reconnectDelay * Math.pow(2, this.reconnectAttempts));
}
};
}
updateDashboard(data) {
// Batch updates to avoid excessive DOM manipulation
if (!this.updateQueue) {
this.updateQueue = [];
requestAnimationFrame(() => this.flushUpdates());
}
this.updateQueue.push(data);
}
flushUpdates() {
// Process all queued updates at once
this.updateQueue.forEach(update => {
this.applyUpdate(update);
});
this.updateQueue = null;
}
}
Data Serialization and Compression
import msgpack
import gzip
from typing import Dict, Any
class OptimizedSerializer:
@staticmethod
def serialize_dashboard_update(data: Dict[str, Any]) -> bytes:
"""Efficient serialization for dashboard updates"""
# Use MessagePack for better performance than JSON
packed = msgpack.packb(data)
# Compress for large payloads
if len(packed) > 1024:
return gzip.compress(packed)
return packed
@staticmethod
def deserialize_dashboard_update(data: bytes) -> Dict[str, Any]:
"""Deserialize dashboard update"""
try:
# Try to decompress first
decompressed = gzip.decompress(data)
return msgpack.unpackb(decompressed)
except:
# If not compressed, unpack directly
return msgpack.unpackb(data)
Monitoring and Observability
Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
messages_processed = Counter('dashboard_messages_processed_total',
'Total processed messages', ['dashboard_id'])
processing_latency = Histogram('dashboard_processing_seconds',
'Message processing latency')
active_connections = Gauge('dashboard_active_connections',
'Number of active WebSocket connections', ['dashboard_id'])
class MetricsMiddleware:
def __init__(self, feed_processor):
self.feed_processor = feed_processor
async def process_with_metrics(self, dashboard_id, message):
start_time = time.time()
try:
await self.feed_processor.process(dashboard_id, message)
messages_processed.labels(dashboard_id=dashboard_id).inc()
finally:
processing_latency.observe(time.time() - start_time)
Error Handling and Resilience
Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.reset()
return result
except Exception as e:
self.record_failure()
raise e
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
def reset(self):
self.failure_count = 0
self.state = 'CLOSED'
Best Practices
Scalability Considerations
- Horizontal scaling: Use load balancers and multiple instances
- Database read replicas: Separate read/write workloads
- Caching layers: Redis/Memcached for frequently accessed data
- Connection pooling: Reuse database and message broker connections
- Rate limiting: Prevent abuse and ensure fair resource usage
Security
- Authentication: JWT tokens or session-based auth for WebSocket connections
- Rate limiting: Per-client and per-dashboard limits
- Input validation: Sanitize all incoming data
- CORS configuration: Proper cross-origin settings for web clients
- SSL/TLS: Encrypt all data in transit
This expertise enables you to design robust, scalable real-time dashboard systems that can handle high-frequency data updates while maintaining low latency and high reliability.