
CEO & Co-founder of Visivo
Building Python Data Pipelines for Visivo Dashboards
Learn how to build robust Python data pipelines that seamlessly integrate with Visivo's BI-as-code approach for powerful, automated analytics workflows.

Python's rich ecosystem combined with Visivo's BI-as-code approach creates a powerful synergy for data teams. According to Grand View Research, "The global data analytics market will reach $684.12 billion by 2030," driven largely by automated data processing capabilities.
By building Python data pipelines that feed directly into Visivo dashboards, you can create fully automated analytics workflows that transform raw data into actionable insights with minimal manual intervention.
This guide explores how to build production-ready Python pipelines that seamlessly integrate with Visivo's YAML-based dashboard configuration, enabling version-controlled, testable, and scalable analytics infrastructure. This approach addresses the finding that VentureBeat reports: "87% of data science projects never make it to production," often due to lack of proper pipeline infrastructure.
The Python + Visivo Integration Architecture
Visivo's command-based models enable Python scripts to feed data directly into your dashboards. This architecture separates data processing logic (Python) from visualization configuration (YAML), creating maintainable and reusable analytics systems that align with modern data stack alignment principles.
Core Integration Pattern
# scripts/generate_sales_data.py
import pandas as pd
import psycopg2
from datetime import datetime, timedelta
import json
def extract_sales_data():
"""Extract raw sales data from multiple sources"""
# API data extraction
api_data = fetch_from_crm_api()
# Database extraction
db_data = fetch_from_transactional_db()
# File processing
file_data = process_uploaded_files()
return combine_data_sources(api_data, db_data, file_data)
def transform_for_analytics(raw_data):
"""Apply business logic and aggregations"""
df = pd.DataFrame(raw_data)
# Clean and standardize
df['sale_date'] = pd.to_datetime(df['sale_date'])
df['revenue'] = df['quantity'] * df['unit_price']
# Calculate business metrics
daily_summary = df.groupby(['sale_date', 'region']).agg({
'revenue': 'sum',
'quantity': 'sum',
'order_id': 'nunique'
}).reset_index()
daily_summary.columns = ['date', 'region', 'total_revenue',
'total_quantity', 'order_count']
return daily_summary
def output_for_visivo(processed_data):
"""Format data for Visivo consumption"""
# Output as CSV for Visivo model consumption
print(processed_data.to_csv(index=False))
if __name__ == "__main__":
raw_data = extract_sales_data()
processed_data = transform_for_analytics(raw_data)
output_for_visivo(processed_data)
This Python script processes data and outputs CSV format that Visivo can consume directly through command-based models:
name: model_example
# project.visivo.yml
models:
- name: daily_sales_metrics
sql: SELECT * FROM table
table_name: sales_data
args:
- python
- scripts/generate_sales_data.py
Advanced Data Pipeline Patterns
1. Multi-Source Data Integration
Real-world analytics often requires combining data from multiple systems. Python excels at this complex orchestration, supporting DuckDB dashboard visualization and other advanced data processing patterns:
# scripts/multi_source_pipeline.py
import asyncio
import aiohttp
import pandas as pd
from sqlalchemy import create_engine
import requests
from datetime import datetime, timedelta
class MultiSourcePipeline:
def __init__(self, config):
self.config = config
self.db_engine = create_engine(config['database_url'])
async def fetch_api_data(self, session, endpoint):
"""Async API data fetching"""
headers = {'Authorization': f"Bearer {self.config['api_token']}"}
async with session.get(endpoint, headers=headers) as response:
return await response.json()
def fetch_database_metrics(self):
"""Extract from operational database"""
query = """
SELECT
DATE(created_at) as metric_date,
product_category,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at), product_category
ORDER BY metric_date DESC
"""
return pd.read_sql(query, self.db_engine)
def process_csv_uploads(self):
"""Handle manual CSV data uploads"""
csv_files = ['data/external_metrics.csv', 'data/partner_data.csv']
combined_df = pd.concat([
pd.read_csv(file) for file in csv_files if os.path.exists(file)
])
return combined_df
async def run_pipeline(self):
"""Orchestrate the complete data pipeline"""
# Parallel API calls
async with aiohttp.ClientSession() as session:
api_tasks = [
self.fetch_api_data(session, 'https://api.stripe.com/charges'),
self.fetch_api_data(session, 'https://api.hubspot.com/contacts'),
self.fetch_api_data(session, 'https://api.salesforce.com/leads')
]
api_results = await asyncio.gather(*api_tasks)
# Database extraction
db_metrics = self.fetch_database_metrics()
# File processing
csv_data = self.process_csv_uploads()
# Combine and transform
unified_data = self.merge_all_sources(api_results, db_metrics, csv_data)
# Output for Visivo
return self.format_for_dashboard(unified_data)
def merge_all_sources(self, api_data, db_data, csv_data):
"""Combine data from all sources into unified schema"""
# Normalize API data
stripe_df = pd.json_normalize(api_data[0]['data'])
hubspot_df = pd.json_normalize(api_data[1]['results'])
# Apply business logic to unify schemas
unified_df = self.apply_business_rules(stripe_df, hubspot_df, db_data, csv_data)
return unified_df
def format_for_dashboard(self, data):
"""Final formatting for Visivo consumption"""
# Calculate KPIs and metrics for dashboard
dashboard_ready = data.groupby(['date', 'source']).agg({
'revenue': 'sum',
'customer_count': 'nunique',
'conversion_rate': 'mean'
}).reset_index()
print(dashboard_ready.to_csv(index=False))
# Entry point for Visivo
if __name__ == "__main__":
config = {
'database_url': os.environ['DATABASE_URL'],
'api_token': os.environ['API_TOKEN']
}
pipeline = MultiSourcePipeline(config)
asyncio.run(pipeline.run_pipeline())
The corresponding Visivo configuration becomes elegantly simple:
name: dashboard_example
models:
- name: unified_business_metrics
sql: SELECT * FROM table
table_name: business_data
args:
- python
- scripts/multi_source_pipeline.py
traces:
- name: revenue_trend
sql: SELECT * FROM table
model: ${ref(unified_business_metrics)}}
columns:
x: date
y: revenue
color: source
props:
type: scatter
mode: lines+markers
x: column(x)
y: column(y)
color: column(color)
charts:
- name: unified_metrics_dashboard
sql: SELECT * FROM table
traces:
- ${ref(revenue_trend)}}
layout:
title:
text: "Multi-Source Revenue Analysis"
xaxis:
title: "Date"
yaxis:
title: "Revenue ($)"
tickformat: "$,.0f"
2. Real-Time Data Processing with Python
For near real-time analytics, Python can process streaming data and update Visivo models:
# scripts/streaming_processor.py
import pandas as pd
import redis
from datetime import datetime
import json
import time
class StreamingProcessor:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.batch_size = 1000
self.processing_window = 60 # seconds
def process_event_stream(self):
"""Process real-time events and aggregate for dashboard"""
events = []
start_time = time.time()
while True:
# Read from Redis stream or queue
event = self.redis_client.blpop(['events'], timeout=1)
if event:
event_data = json.loads(event[1])
events.append(event_data)
# Process batch when size or time threshold reached
current_time = time.time()
if (len(events) >= self.batch_size or
current_time - start_time >= self.processing_window):
if events:
self.process_event_batch(events)
events = []
start_time = current_time
def process_event_batch(self, events):
"""Transform event batch into dashboard metrics"""
df = pd.DataFrame(events)
# Real-time aggregations
metrics = df.groupby(['timestamp_hour', 'event_type']).agg({
'user_id': 'nunique',
'revenue': 'sum',
'conversion': 'mean'
}).reset_index()
# Update dashboard data
self.update_dashboard_data(metrics)
def update_dashboard_data(self, metrics):
"""Write processed metrics for Visivo consumption"""
# Append to existing data or overwrite
output_file = 'data/realtime_metrics.csv'
if os.path.exists(output_file):
existing = pd.read_csv(output_file)
combined = pd.concat([existing, metrics]).drop_duplicates()
else:
combined = metrics
combined.to_csv(output_file, index=False)
# Trigger Visivo refresh (if using file watching)
touch_file = 'data/.refresh_trigger'
with open(touch_file, 'w') as f:
f.write(str(datetime.now()))
if __name__ == "__main__":
processor = StreamingProcessor()
processor.process_event_stream()
Configure Visivo to consume the real-time data:
name: trace_example
models:
- name: realtime_metrics
sql: SELECT * FROM table
table_name: live_data
args:
- cat
- data/realtime_metrics.csv
traces:
- name: live_conversions
sql: SELECT * FROM table
model: ${ref(realtime_metrics)}}
columns:
x: timestamp_hour
y: conversion
color: event_type
props:
type: scatter
mode: lines
x: column(x)
y: column(y)
color: column(color)
Data Quality and Testing Integration
Python's testing ecosystem integrates seamlessly with Visivo's testing framework:
1. Data Pipeline Testing
# tests/test_pipeline_quality.py
import pytest
import pandas as pd
from scripts.generate_sales_data import transform_for_analytics
from visivo.testing import get_model_data
class TestDataPipeline:
def test_data_completeness(self):
"""Ensure pipeline produces complete data"""
# Test the Python pipeline directly
sample_data = self.create_sample_data()
result = transform_for_analytics(sample_data)
# Verify no missing critical fields
assert not result['date'].isna().any()
assert not result['revenue'].isna().any()
assert result['revenue'].min() >= 0
def test_business_logic(self):
"""Verify business calculations are correct"""
sample_data = [{
'sale_date': '2024-01-01',
'quantity': 10,
'unit_price': 50.00,
'region': 'US'
}]
result = transform_for_analytics(sample_data)
expected_revenue = 10 * 50.00
assert result.iloc[0]['total_revenue'] == expected_revenue
def test_visivo_model_output(self):
"""Test the actual Visivo model data"""
data = get_model_data("daily_sales_metrics")
# Verify data structure matches dashboard expectations
required_columns = ['date', 'region', 'total_revenue', 'order_count']
for col in required_columns:
assert col in data.columns
# Business rules validation
assert data['total_revenue'].min() >= 0
assert data['order_count'].min() > 0
def test_data_freshness(self):
"""Ensure data is recent enough for business needs"""
from datetime import datetime, timedelta
data = get_model_data("daily_sales_metrics")
latest_date = pd.to_datetime(data['date']).max()
days_old = (datetime.now() - latest_date).days
# Data should be no more than 2 days old
assert days_old <= 2, f"Data is {days_old} days old"
def create_sample_data(self):
"""Create test data for pipeline testing"""
return [
{
'sale_date': '2024-01-01',
'quantity': 5,
'unit_price': 100.00,
'region': 'US',
'order_id': 'ORD001'
},
{
'sale_date': '2024-01-01',
'quantity': 3,
'unit_price': 75.00,
'region': 'EU',
'order_id': 'ORD002'
}
]
2. Automated Data Validation
# scripts/data_validator.py
import pandas as pd
from datetime import datetime, timedelta
import sys
class DataValidator:
def __init__(self, data_source):
self.data = pd.read_csv(data_source) if isinstance(data_source, str) else data_source
self.errors = []
def validate_schema(self, expected_columns):
"""Validate data schema matches expectations"""
missing_cols = set(expected_columns) - set(self.data.columns)
if missing_cols:
self.errors.append(f"Missing columns: {missing_cols}")
def validate_business_rules(self):
"""Apply business-specific validation rules"""
# Revenue must be positive
if (self.data['revenue'] < 0).any():
self.errors.append("Found negative revenue values")
# Dates must be reasonable
min_date = datetime.now() - timedelta(days=365)
max_date = datetime.now() + timedelta(days=1)
date_col = pd.to_datetime(self.data['date'])
if (date_col < min_date).any() or (date_col > max_date).any():
self.errors.append("Found unreasonable dates")
def validate_data_quality(self):
"""Check data quality metrics"""
# Check for duplicate records
if self.data.duplicated().any():
self.errors.append("Found duplicate records")
# Check for excessive nulls
null_pct = self.data.isnull().mean()
high_null_cols = null_pct[null_pct > 0.1].index.tolist()
if high_null_cols:
self.errors.append(f"High null percentage in: {high_null_cols}")
def run_validation(self):
"""Run all validation checks"""
expected_cols = ['date', 'region', 'revenue', 'order_count']
self.validate_schema(expected_cols)
self.validate_business_rules()
self.validate_data_quality()
if self.errors:
print(f"VALIDATION FAILED: {len(self.errors)} errors found")
for error in self.errors:
print(f" - {error}")
sys.exit(1)
else:
print("Data validation passed")
return len(self.errors) == 0
# Integration with pipeline
def validated_pipeline():
"""Pipeline with built-in validation"""
raw_data = extract_sales_data()
processed_data = transform_for_analytics(raw_data)
# Validate before output
validator = DataValidator(processed_data)
if validator.run_validation():
output_for_visivo(processed_data)
else:
sys.exit(1)
if __name__ == "__main__":
validated_pipeline()
Production Deployment Patterns
1. Dockerized Pipeline Environment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Install Visivo CLI
RUN pip install visivo
# Copy pipeline scripts
COPY scripts/ ./scripts/
COPY project.visivo.yml .
COPY .env.example .env
# Run pipeline and serve dashboard
CMD ["sh", "-c", "python scripts/generate_sales_data.py && visivo serve --host 0.0.0.0"]
# docker-compose.yml
version: '3.8'
services:
analytics-pipeline:
build: .
ports:
- "8080:8080"
environment:
- DATABASE_URL=${DATABASE_URL}
- API_TOKEN=${API_TOKEN}
volumes:
- ./data:/app/data
- ./scripts:/app/scripts
depends_on:
- postgres
postgres:
image: postgres:14
environment:
POSTGRES_DB: analytics
POSTGRES_USER: analytics_user
POSTGRES_PASSWORD: secure_password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
2. CI/CD Integration
# .github/workflows/analytics-pipeline.yml
name: Analytics Pipeline CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test-pipeline:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14
env:
POSTGRES_PASSWORD: test_password
POSTGRES_DB: test_analytics
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install visivo
- name: Run pipeline tests
run: |
pytest tests/ -v
- name: Test Visivo compilation
run: |
visivo compile --validate
- name: Run data quality tests
run: |
visivo test
deploy-staging:
needs: test-pipeline
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy to staging
run: |
# Run pipeline with staging data
python scripts/generate_sales_data.py
# Deploy to Visivo staging
visivo deploy -s staging
Advanced Analytics Patterns
1. Time Series Forecasting Pipeline
# scripts/forecasting_pipeline.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import joblib
from datetime import datetime, timedelta
class ForecastingPipeline:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.feature_columns = ['day_of_week', 'month', 'is_weekend', 'lag_7', 'lag_30']
def create_features(self, df):
"""Engineer features for forecasting"""
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# Time-based features
df['day_of_week'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
# Lag features
df['lag_7'] = df['revenue'].shift(7)
df['lag_30'] = df['revenue'].shift(30)
# Rolling averages
df['ma_7'] = df['revenue'].rolling(window=7).mean()
df['ma_30'] = df['revenue'].rolling(window=30).mean()
return df.dropna()
def train_model(self, historical_data):
"""Train forecasting model on historical data"""
df = self.create_features(historical_data)
X = df[self.feature_columns]
y = df['revenue']
# Split for validation
split_date = df['date'].max() - timedelta(days=30)
train_mask = df['date'] <= split_date
X_train, X_val = X[train_mask], X[~train_mask]
y_train, y_val = y[train_mask], y[~train_mask]
# Train model
self.model.fit(X_train, y_train)
# Validate
val_pred = self.model.predict(X_val)
mae = mean_absolute_error(y_val, val_pred)
print(f"Model validation MAE: ${mae:,.0f}")
# Save model
joblib.dump(self.model, 'models/revenue_forecast.pkl')
def generate_forecasts(self, historical_data, forecast_days=30):
"""Generate future forecasts"""
df = self.create_features(historical_data)
# Load trained model
self.model = joblib.load('models/revenue_forecast.pkl')
forecasts = []
last_date = df['date'].max()
for i in range(1, forecast_days + 1):
forecast_date = last_date + timedelta(days=i)
# Create features for forecast date
features = {
'day_of_week': forecast_date.dayofweek,
'month': forecast_date.month,
'is_weekend': int(forecast_date.weekday() >= 5),
'lag_7': df['revenue'].iloc[-7] if len(df) >= 7 else df['revenue'].mean(),
'lag_30': df['revenue'].iloc[-30] if len(df) >= 30 else df['revenue'].mean()
}
feature_array = np.array([[features[col] for col in self.feature_columns]])
prediction = self.model.predict(feature_array)[0]
forecasts.append({
'date': forecast_date.strftime('%Y-%m-%d'),
'revenue_forecast': prediction,
'is_forecast': True
})
return pd.DataFrame(forecasts)
def output_combined_data(self, historical_data):
"""Combine historical and forecast data for Visivo"""
# Historical data
historical_df = historical_data.copy()
historical_df['is_forecast'] = False
historical_df['revenue_forecast'] = historical_df['revenue']
# Generate forecasts
forecast_df = self.generate_forecasts(historical_data)
# Combine
combined = pd.concat([
historical_df[['date', 'revenue_forecast', 'is_forecast']],
forecast_df
])
print(combined.to_csv(index=False))
if __name__ == "__main__":
# Load historical data
historical_data = pd.read_csv('data/historical_revenue.csv')
pipeline = ForecastingPipeline()
# Train model (run periodically)
if '--train' in sys.argv:
pipeline.train_model(historical_data)
# Generate forecasts for dashboard
pipeline.output_combined_data(historical_data)
Visivo configuration for forecast visualization:
name: example_project
models:
- name: revenue_with_forecasts
sql: SELECT * FROM table
table_name: forecast_data
args:
- python
- scripts/forecasting_pipeline.py
traces:
- name: historical_revenue
sql: SELECT * FROM table
model: ${ref(revenue_with_forecasts)}}
filters:
- ?{is_forecast = false}
columns:
x: date
y: revenue_forecast
props:
type: scatter
mode: lines
x: column(x)
y: column(y)
name: "Historical Revenue"
line:
color: '#2E86C1'
- name: forecast_revenue
sql: SELECT * FROM table
model: ${ref(revenue_with_forecasts)}}
filters:
- ?{is_forecast = true}
columns:
x: date
y: revenue_forecast
props:
type: scatter
mode: lines
x: column(x)
y: column(y)
name: "Forecast"
line:
color: '#E74C3C'
dash: 'dash'
charts:
- name: revenue_forecast_chart
sql: SELECT * FROM table
traces:
- ${ref(historical_revenue)}}
- ${ref(forecast_revenue)}}
layout:
title:
text: "Revenue Forecast - Next 30 Days"
annotations:
- x: "{{ current_date }}"
y: 0
yref: "paper"
text: "Today"
showarrow: true
Performance Optimization Strategies
1. Efficient Data Processing
# scripts/optimized_processor.py
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
class OptimizedProcessor:
def __init__(self, chunk_size=10000, max_workers=None):
self.chunk_size = chunk_size
self.max_workers = max_workers or mp.cpu_count()
def process_large_dataset(self, data_source):
"""Process large datasets efficiently using chunking"""
# Use chunking for memory efficiency
chunks = []
for chunk in pd.read_csv(data_source, chunksize=self.chunk_size):
processed_chunk = self.process_chunk(chunk)
chunks.append(processed_chunk)
# Combine results
return pd.concat(chunks, ignore_index=True)
def process_chunk(self, chunk):
"""Process individual data chunk"""
# Vectorized operations for speed
chunk['revenue'] = chunk['quantity'] * chunk['price']
chunk['profit_margin'] = (chunk['revenue'] - chunk['cost']) / chunk['revenue']
# Efficient aggregations
return chunk.groupby(['date', 'product_id']).agg({
'revenue': 'sum',
'profit_margin': 'mean',
'quantity': 'sum'
}).reset_index()
def parallel_api_extraction(self, endpoints):
"""Extract from multiple APIs in parallel"""
def fetch_endpoint(endpoint):
response = requests.get(endpoint, headers=self.headers)
return response.json()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(fetch_endpoint, endpoints))
return results
def memory_efficient_aggregation(self, large_dataframe):
"""Perform aggregations without loading entire dataset in memory"""
# Use categorical data types for memory efficiency
large_dataframe['category'] = large_dataframe['category'].astype('category')
large_dataframe['region'] = large_dataframe['region'].astype('category')
# Efficient groupby operations
result = large_dataframe.groupby(['date', 'category', 'region']).agg({
'revenue': ['sum', 'mean', 'count'],
'profit': 'sum'
})
# Flatten column names
result.columns = ['_'.join(col).strip() for col in result.columns]
return result.reset_index()
if __name__ == "__main__":
processor = OptimizedProcessor()
# Process large dataset efficiently
result = processor.process_large_dataset('data/large_sales_data.csv')
# Output for Visivo
print(result.to_csv(index=False))
2. Caching and Incremental Processing
# scripts/incremental_processor.py
import pandas as pd
import hashlib
import pickle
import os
from datetime import datetime
class IncrementalProcessor:
def __init__(self, cache_dir='cache'):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def get_cache_key(self, query, params=None):
"""Generate cache key for query and parameters"""
content = f"{query}_{params}".encode('utf-8')
return hashlib.md5(content).hexdigest()
def load_from_cache(self, cache_key):
"""Load processed data from cache"""
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
if os.path.exists(cache_file):
# Check if cache is still valid (e.g., less than 1 hour old)
cache_age = datetime.now().timestamp() - os.path.getmtime(cache_file)
if cache_age < 3600: # 1 hour
with open(cache_file, 'rb') as f:
return pickle.load(f)
return None
def save_to_cache(self, data, cache_key):
"""Save processed data to cache"""
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
with open(cache_file, 'wb') as f:
pickle.dump(data, f)
def incremental_data_load(self, table_name, last_processed_date=None):
"""Load only new data since last processing"""
if last_processed_date:
query = f"""
SELECT * FROM {table_name}
WHERE updated_at > '{last_processed_date}'
ORDER BY updated_at
"""
else:
query = f"SELECT * FROM {table_name} ORDER BY updated_at"
cache_key = self.get_cache_key(query)
cached_data = self.load_from_cache(cache_key)
if cached_data is not None:
return cached_data
# Fetch new data
new_data = pd.read_sql(query, self.db_connection)
# Cache the result
self.save_to_cache(new_data, cache_key)
return new_data
def merge_incremental_updates(self, existing_data, new_data):
"""Merge new data with existing processed data"""
if new_data.empty:
return existing_data
# Remove duplicates and merge
combined = pd.concat([existing_data, new_data])
return combined.drop_duplicates(subset=['id'], keep='last')
if __name__ == "__main__":
processor = IncrementalProcessor()
# Load state from previous run
state_file = 'state/last_processed.txt'
last_processed = None
if os.path.exists(state_file):
with open(state_file, 'r') as f:
last_processed = f.read().strip()
# Process only new data
new_data = processor.incremental_data_load('sales', last_processed)
if not new_data.empty:
processed_data = process_sales_data(new_data)
# Save current state
os.makedirs('state', exist_ok=True)
with open(state_file, 'w') as f:
f.write(datetime.now().isoformat())
# Output for Visivo
print(processed_data.to_csv(index=False))
else:
# No new data, output empty result
print("date,revenue,orders") # Headers only
Monitoring and Alerting
Integrate Python pipeline monitoring with Visivo's alerting system:
# scripts/monitored_pipeline.py
import logging
import time
from datetime import datetime
import sys
class PipelineMonitor:
def __init__(self):
self.setup_logging()
self.metrics = {}
def setup_logging(self):
"""Configure pipeline logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/pipeline.log'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def monitor_execution_time(self, func):
"""Decorator to monitor function execution time"""
def wrapper(*args, **kwargs):
start_time = time.time()
self.logger.info(f"Starting {func.__name__}")
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
self.metrics[f"{func.__name__}_duration"] = execution_time
self.logger.info(f"Completed {func.__name__} in {execution_time:.2f} seconds")
return result
except Exception as e:
self.logger.error(f"Error in {func.__name__}: {str(e)}")
self.metrics[f"{func.__name__}_error"] = str(e)
raise
return wrapper
def track_data_quality_metrics(self, df, stage_name):
"""Track data quality metrics for monitoring"""
metrics = {
f"{stage_name}_row_count": len(df),
f"{stage_name}_null_percentage": df.isnull().mean().mean() * 100,
f"{stage_name}_duplicate_count": df.duplicated().sum(),
f"{stage_name}_memory_usage_mb": df.memory_usage(deep=True).sum() / 1024 / 1024
}
self.metrics.update(metrics)
# Log concerning metrics
if metrics[f"{stage_name}_null_percentage"] > 10:
self.logger.warning(f"High null percentage in {stage_name}: {metrics[f'{stage_name}_null_percentage']:.1f}%")
if metrics[f"{stage_name}_duplicate_count"] > 0:
self.logger.warning(f"Found {metrics[f'{stage_name}_duplicate_count']} duplicates in {stage_name}")
def output_monitoring_data(self):
"""Output monitoring metrics for Visivo dashboard"""
monitoring_df = pd.DataFrame([{
'timestamp': datetime.now().isoformat(),
'pipeline_run_id': f"run_{int(time.time())}",
**self.metrics
}])
print(monitoring_df.to_csv(index=False))
# Usage in main pipeline
def main_pipeline():
monitor = PipelineMonitor()
# Decorate pipeline functions with monitoring
@monitor.monitor_execution_time
def extract_data():
# Data extraction logic
data = fetch_sales_data()
monitor.track_data_quality_metrics(data, "extraction")
return data
@monitor.monitor_execution_time
def transform_data(raw_data):
# Transformation logic
processed_data = apply_transformations(raw_data)
monitor.track_data_quality_metrics(processed_data, "transformation")
return processed_data
try:
# Run pipeline with monitoring
raw_data = extract_data()
processed_data = transform_data(raw_data)
# Output both business data and monitoring metrics
print("--- BUSINESS DATA ---")
print(processed_data.to_csv(index=False))
print("--- MONITORING DATA ---")
monitor.output_monitoring_data()
except Exception as e:
monitor.logger.error(f"Pipeline failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main_pipeline()
Configure Visivo to visualize pipeline health, enabling reliable BI insights for stakeholders:
models:
- name: pipeline_monitoring
table_name: monitoring_data
args:
- python
- scripts/monitored_pipeline.py
- |
- tail
- -n
- "+2" # Skip business data, get monitoring data
traces:
- name: pipeline-execution-time
model: ${ref(pipeline_monitoring)}
columns:
x: timestamp
y: extraction_duration
props:
type: scatter
mode: lines+markers
x: column(x)
y: column(y)
name: "Extraction Time (s)"
- name: data-quality-metrics
model: ${ref(pipeline_monitoring)}
columns:
x: timestamp
y: transformation_null_percentage
props:
type: scatter
mode: lines+markers
x: column(x)
y: column(y)
name: "Null Percentage (%)"
alerts:
- name: pipeline-performance-alert
model: ${ref(pipeline_monitoring)}
if:
condition: "extraction_duration > 300" # 5 minutes
destinations:
- type: slack
webhook_url: "{{ env_var('SLACK_WEBHOOK') }}"
message: "⚠️ Pipeline extraction taking longer than expected: {{ extraction_duration }}s"
Conclusion
Integrating Python data pipelines with Visivo creates a powerful, scalable analytics infrastructure that combines the flexibility of Python's ecosystem with the simplicity of BI-as-code. This approach enables:
- Automated data workflows that run reliably in production
- Version-controlled analytics with full audit trails
- Testable data pipelines with comprehensive quality checks
- Scalable processing for large datasets and real-time streams
- Monitoring and alerting for production reliability
By leveraging Python's strengths in data processing alongside Visivo's YAML-based dashboard configuration, teams can build analytics systems that are both powerful and maintainable. The command-based model pattern provides the perfect bridge between complex data processing logic and clean visualization configuration.
Whether you're building simple ETL pipelines or complex machine learning workflows, the Python + Visivo combination offers a modern approach to analytics that scales with your organization's data needs while maintaining the simplicity and collaboration benefits of BI-as-code.
For related topics, explore our guides on dbt™ local development, CI/CD analytics implementation, and visualizations as code.