Markdown Table Data Validation and Quality Assurance: Complete Guide for Automated Testing and Error Detection in Tabular Content
Advanced Markdown table data validation and quality assurance techniques enable systematic verification of tabular content accuracy, structural integrity, and formatting consistency across large-scale documentation projects. By implementing automated validation workflows, intelligent error detection systems, and comprehensive quality checks, technical writers can maintain reliable data presentation standards while preventing common table formatting errors, data inconsistencies, and accessibility violations that compromise user experience and content reliability.
Why Implement Table Data Validation?
Professional table validation provides essential benefits for content quality management:
- Data Integrity: Ensure accuracy and consistency of tabular information across documentation systems
- Error Prevention: Automatically detect and prevent formatting errors before content publication
- Quality Standards: Maintain consistent table structure and presentation across large content repositories
- Accessibility Compliance: Verify tables meet accessibility guidelines and screen reader compatibility
- Performance Optimization: Identify table structure issues that impact rendering performance
- Automated Workflows: Integrate validation into CI/CD pipelines for systematic quality assurance
Foundation Validation Principles
Core Table Structure Validation
Understanding essential validation criteria for robust table implementation:
# Basic Table Structure Validation Checklist
## Structural Integrity Requirements
| Validation Rule | Description | Error Impact | Example |
|:----------------|:------------|:-------------|:--------|
| **Header Count Match** | All rows must have same column count as header | Critical | Header: 3 cols, Row: 4 cols ❌ |
| **Pipe Alignment** | Proper pipe character placement and escaping | High | `\|` escaped vs `|` unescaped |
| **Separator Format** | Valid alignment markers in separator row | High | `:---:` center vs `---` left |
| **Empty Cell Handling** | Consistent treatment of empty table cells | Medium | Missing vs explicit empty cells |
| **Line Termination** | Proper line endings for cross-platform compatibility | Low | Unix vs Windows line endings |
## Data Type Consistency Validation
```yaml
# validation-rules.yml - Column-specific validation rules
columns:
- name: "employee_id"
type: "integer"
required: true
min_value: 1000
max_value: 9999
unique: true
- name: "email_address"
type: "email"
required: true
pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
unique: true
- name: "hire_date"
type: "date"
required: true
format: "YYYY-MM-DD"
min_date: "2020-01-01"
max_date: "today"
- name: "department"
type: "categorical"
required: true
allowed_values: ["Engineering", "Marketing", "Sales", "HR", "Finance"]
- name: "salary"
type: "currency"
required: false
min_value: 30000
max_value: 500000
format: "$X,XXX.XX"
```
## Sample Employee Table for Validation Testing
| Employee ID | Full Name | Email Address | Hire Date | Department | Annual Salary |
|:------------|:----------|:--------------|:----------|:-----------|:-------------|
| 1001 | Sarah Chen | [email protected] | 2023-01-15 | Engineering | $95,000.00 |
| 1002 | Michael Rodriguez | [email protected] | 2023-02-01 | Marketing | $75,000.00 |
| 1003 | Lisa Wang | [email protected] | 2023-03-10 | Sales | $68,000.00 |
| 1004 | James Thompson | [email protected] | 2023-04-22 | Engineering | $89,500.00 |
| 1005 | Emily Davis | [email protected] | 2023-05-08 | HR | $72,000.00 |
> **Validation Note**: This table demonstrates proper data formatting with consistent types, valid email formats, sequential ID numbers, and standardized currency formatting.
Automated Validation Implementation
Building comprehensive automated validation systems:
# Automated Table Validation Framework
## Python-Based Validation Engine
```python
#!/usr/bin/env python3
# markdown-table-validator.py - Comprehensive table validation system
import re
import json
import yaml
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum
import email_validator
from decimal import Decimal, InvalidOperation
class ValidationSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class DataType(Enum):
TEXT = "text"
INTEGER = "integer"
FLOAT = "float"
CURRENCY = "currency"
EMAIL = "email"
DATE = "date"
PHONE = "phone"
URL = "url"
CATEGORICAL = "categorical"
BOOLEAN = "boolean"
@dataclass
class ValidationError:
"""Individual validation error details"""
rule: str
message: str
severity: ValidationSeverity
row_index: Optional[int] = None
column_index: Optional[int] = None
column_name: Optional[str] = None
actual_value: Optional[str] = None
expected_value: Optional[str] = None
fix_suggestion: Optional[str] = None
@dataclass
class ColumnValidationRule:
"""Validation rules for individual table columns"""
name: str
data_type: DataType
required: bool = True
unique: bool = False
min_length: Optional[int] = None
max_length: Optional[int] = None
pattern: Optional[str] = None
allowed_values: Optional[List[str]] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
date_format: Optional[str] = None
min_date: Optional[str] = None
max_date: Optional[str] = None
custom_validator: Optional[str] = None
@dataclass
class TableValidationConfig:
"""Complete table validation configuration"""
table_name: str
description: str
columns: List[ColumnValidationRule]
allow_extra_columns: bool = False
require_header: bool = True
min_rows: int = 0
max_rows: Optional[int] = None
unique_constraints: List[List[str]] = field(default_factory=list)
foreign_keys: Dict[str, str] = field(default_factory=dict)
class MarkdownTableValidator:
"""Comprehensive Markdown table validation system"""
def __init__(self):
self.email_validator = email_validator
self.validation_errors = []
self.validation_warnings = []
self.performance_metrics = {}
def validate_table_from_markdown(self, markdown_content: str, config: TableValidationConfig) -> Dict[str, Any]:
"""Validate a Markdown table against configuration rules"""
start_time = datetime.now()
self.validation_errors = []
self.validation_warnings = []
try:
# Parse Markdown table
table_data = self._parse_markdown_table(markdown_content)
if not table_data:
self._add_error("table_parsing", "Unable to parse Markdown table", ValidationSeverity.CRITICAL)
return self._generate_validation_report(start_time)
# Perform validation checks
self._validate_table_structure(table_data, config)
self._validate_column_headers(table_data, config)
self._validate_row_count(table_data, config)
self._validate_column_data(table_data, config)
self._validate_unique_constraints(table_data, config)
self._validate_foreign_keys(table_data, config)
self._validate_accessibility(table_data, config)
except Exception as e:
self._add_error("validation_error", f"Validation process failed: {str(e)}", ValidationSeverity.CRITICAL)
return self._generate_validation_report(start_time)
def _parse_markdown_table(self, markdown_content: str) -> Optional[Dict[str, Any]]:
"""Parse Markdown table into structured data"""
lines = [line.strip() for line in markdown_content.strip().split('\n') if line.strip()]
if len(lines) < 3:
return None
# Extract header
header_line = lines[0]
separator_line = lines[1]
data_lines = lines[2:]
# Parse header
headers = self._parse_table_row(header_line)
if not headers:
return None
# Parse separator and detect alignment
separators = self._parse_table_row(separator_line)
if len(separators) != len(headers):
return None
alignments = [self._detect_alignment(sep) for sep in separators]
# Parse data rows
rows = []
for i, line in enumerate(data_lines):
if line.strip():
row = self._parse_table_row(line)
if len(row) != len(headers):
self._add_error("row_column_mismatch",
f"Row {i+1} has {len(row)} columns, expected {len(headers)}",
ValidationSeverity.HIGH, row_index=i+1)
rows.append(row)
return {
'headers': headers,
'alignments': alignments,
'rows': rows,
'raw_content': markdown_content
}
def _parse_table_row(self, row: str) -> List[str]:
"""Parse a table row into individual cells"""
# Remove leading/trailing pipes and whitespace
row = row.strip().strip('|').strip()
if not row:
return []
cells = []
current_cell = ""
escaped = False
for char in row:
if escaped:
current_cell += char
escaped = False
elif char == '\\':
escaped = True
current_cell += char
elif char == '|':
cells.append(current_cell.strip())
current_cell = ""
else:
current_cell += char
if current_cell or cells: # Handle final cell
cells.append(current_cell.strip())
return cells
def _detect_alignment(self, separator: str) -> str:
"""Detect column alignment from separator"""
separator = separator.strip()
if separator.startswith(':') and separator.endswith(':'):
return 'center'
elif separator.endswith(':'):
return 'right'
else:
return 'left'
def _validate_table_structure(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate basic table structure"""
headers = table_data['headers']
rows = table_data['rows']
# Check if table has required header
if config.require_header and not headers:
self._add_error("missing_header", "Table requires header row", ValidationSeverity.CRITICAL)
# Check for empty table
if not rows:
self._add_error("empty_table", "Table contains no data rows", ValidationSeverity.HIGH)
# Validate column count consistency
expected_columns = len(headers) if headers else (len(rows[0]) if rows else 0)
for i, row in enumerate(rows):
if len(row) != expected_columns:
self._add_error("column_count_mismatch",
f"Row {i+1} has {len(row)} columns, expected {expected_columns}",
ValidationSeverity.HIGH, row_index=i+1)
def _validate_column_headers(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate column headers against configuration"""
headers = table_data['headers']
config_columns = {col.name for col in config.columns}
actual_columns = set(headers) if headers else set()
# Check for missing required columns
missing_columns = config_columns - actual_columns
if missing_columns:
self._add_error("missing_columns",
f"Missing required columns: {', '.join(missing_columns)}",
ValidationSeverity.CRITICAL)
# Check for unexpected columns
if not config.allow_extra_columns:
extra_columns = actual_columns - config_columns
if extra_columns:
self._add_warning("extra_columns",
f"Unexpected columns found: {', '.join(extra_columns)}",
ValidationSeverity.MEDIUM)
# Validate header naming conventions
for header in headers:
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_\s]*$', header):
self._add_warning("header_naming",
f"Header '{header}' doesn't follow naming conventions",
ValidationSeverity.LOW)
def _validate_row_count(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate table row count constraints"""
row_count = len(table_data['rows'])
if row_count < config.min_rows:
self._add_error("insufficient_rows",
f"Table has {row_count} rows, minimum required: {config.min_rows}",
ValidationSeverity.HIGH)
if config.max_rows and row_count > config.max_rows:
self._add_error("excessive_rows",
f"Table has {row_count} rows, maximum allowed: {config.max_rows}",
ValidationSeverity.MEDIUM)
def _validate_column_data(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate individual column data against rules"""
headers = table_data['headers']
rows = table_data['rows']
if not headers:
return
# Create column rule lookup
column_rules = {rule.name: rule for rule in config.columns}
for col_index, header in enumerate(headers):
if header not in column_rules:
continue
rule = column_rules[header]
column_data = [row[col_index] if col_index < len(row) else "" for row in rows]
self._validate_column_values(column_data, rule, col_index, header)
def _validate_column_values(self, column_data: List[str], rule: ColumnValidationRule,
col_index: int, column_name: str):
"""Validate individual column values"""
for row_index, value in enumerate(column_data):
# Check required fields
if rule.required and (not value or value.strip() == ""):
self._add_error("required_field_empty",
f"Required field '{column_name}' is empty",
ValidationSeverity.HIGH, row_index=row_index+1,
column_index=col_index, column_name=column_name)
continue
if not value or value.strip() == "":
continue # Skip validation for empty optional fields
# Data type validation
self._validate_data_type(value, rule, row_index+1, col_index, column_name)
# Length validation
if rule.min_length and len(value) < rule.min_length:
self._add_error("value_too_short",
f"Value '{value}' in '{column_name}' is too short (min: {rule.min_length})",
ValidationSeverity.MEDIUM, row_index=row_index+1,
column_index=col_index, column_name=column_name, actual_value=value)
if rule.max_length and len(value) > rule.max_length:
self._add_error("value_too_long",
f"Value '{value}' in '{column_name}' is too long (max: {rule.max_length})",
ValidationSeverity.MEDIUM, row_index=row_index+1,
column_index=col_index, column_name=column_name, actual_value=value)
# Pattern validation
if rule.pattern and not re.match(rule.pattern, value):
self._add_error("pattern_mismatch",
f"Value '{value}' in '{column_name}' doesn't match required pattern",
ValidationSeverity.HIGH, row_index=row_index+1,
column_index=col_index, column_name=column_name, actual_value=value)
# Categorical validation
if rule.allowed_values and value not in rule.allowed_values:
self._add_error("invalid_categorical_value",
f"Value '{value}' not in allowed values: {rule.allowed_values}",
ValidationSeverity.HIGH, row_index=row_index+1,
column_index=col_index, column_name=column_name, actual_value=value,
expected_value=f"One of: {', '.join(rule.allowed_values)}")
def _validate_data_type(self, value: str, rule: ColumnValidationRule,
row_index: int, col_index: int, column_name: str):
"""Validate specific data types"""
try:
if rule.data_type == DataType.INTEGER:
int_value = int(value.replace(',', ''))
if rule.min_value and int_value < rule.min_value:
self._add_error("value_below_minimum",
f"Integer value {int_value} below minimum {rule.min_value}",
ValidationSeverity.MEDIUM, row_index=row_index,
column_index=col_index, column_name=column_name)
if rule.max_value and int_value > rule.max_value:
self._add_error("value_above_maximum",
f"Integer value {int_value} above maximum {rule.max_value}",
ValidationSeverity.MEDIUM, row_index=row_index,
column_index=col_index, column_name=column_name)
elif rule.data_type == DataType.FLOAT:
float_value = float(value.replace(',', ''))
if rule.min_value and float_value < rule.min_value:
self._add_error("value_below_minimum",
f"Float value {float_value} below minimum {rule.min_value}",
ValidationSeverity.MEDIUM, row_index=row_index,
column_index=col_index, column_name=column_name)
if rule.max_value and float_value > rule.max_value:
self._add_error("value_above_maximum",
f"Float value {float_value} above maximum {rule.max_value}",
ValidationSeverity.MEDIUM, row_index=row_index,
column_index=col_index, column_name=column_name)
elif rule.data_type == DataType.CURRENCY:
currency_value = value.replace('$', '').replace(',', '')
Decimal(currency_value) # Validate format
elif rule.data_type == DataType.EMAIL:
email_validator.validate_email(value)
elif rule.data_type == DataType.DATE:
if rule.date_format:
datetime.strptime(value, rule.date_format)
else:
# Try common formats
for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d-%m-%Y']:
try:
datetime.strptime(value, fmt)
break
except ValueError:
continue
else:
raise ValueError("Invalid date format")
elif rule.data_type == DataType.PHONE:
# Simple phone validation
phone_pattern = r'^[\+]?[\d\s\-\(\)]{10,}$'
if not re.match(phone_pattern, value):
raise ValueError("Invalid phone format")
elif rule.data_type == DataType.URL:
url_pattern = r'^https?://[^\s/$.?#].[^\s]*$'
if not re.match(url_pattern, value, re.IGNORECASE):
raise ValueError("Invalid URL format")
elif rule.data_type == DataType.BOOLEAN:
if value.lower() not in ['true', 'false', 'yes', 'no', '1', '0']:
raise ValueError("Invalid boolean value")
except (ValueError, InvalidOperation, email_validator.EmailNotValidError) as e:
self._add_error("data_type_validation",
f"Invalid {rule.data_type.value} format: '{value}' - {str(e)}",
ValidationSeverity.HIGH, row_index=row_index,
column_index=col_index, column_name=column_name, actual_value=value)
def _validate_unique_constraints(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate uniqueness constraints"""
headers = table_data['headers']
rows = table_data['rows']
if not headers:
return
# Individual column uniqueness
column_rules = {rule.name: rule for rule in config.columns}
for col_index, header in enumerate(headers):
if header in column_rules and column_rules[header].unique:
column_data = [row[col_index] if col_index < len(row) else "" for row in rows]
seen_values = {}
for row_index, value in enumerate(column_data):
if value and value in seen_values:
self._add_error("unique_constraint_violation",
f"Duplicate value '{value}' in unique column '{header}'",
ValidationSeverity.HIGH, row_index=row_index+1,
column_index=col_index, column_name=header, actual_value=value,
fix_suggestion=f"Previously found in row {seen_values[value]}")
elif value:
seen_values[value] = row_index + 1
# Multi-column uniqueness constraints
for constraint_columns in config.unique_constraints:
if all(col in headers for col in constraint_columns):
constraint_indices = [headers.index(col) for col in constraint_columns]
seen_combinations = {}
for row_index, row in enumerate(rows):
combination = tuple(row[i] if i < len(row) else "" for i in constraint_indices)
if combination in seen_combinations:
self._add_error("multi_column_unique_violation",
f"Duplicate combination {combination} in columns {constraint_columns}",
ValidationSeverity.HIGH, row_index=row_index+1,
fix_suggestion=f"Previously found in row {seen_combinations[combination]}")
else:
seen_combinations[combination] = row_index + 1
def _validate_foreign_keys(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate foreign key constraints"""
# Note: This is a simplified implementation
# In practice, you would need access to related tables
for column_name, referenced_table in config.foreign_keys.items():
self._add_warning("foreign_key_check",
f"Foreign key validation for '{column_name}' -> '{referenced_table}' requires external data",
ValidationSeverity.INFO)
def _validate_accessibility(self, table_data: Dict[str, Any], config: TableValidationConfig):
"""Validate table accessibility requirements"""
headers = table_data['headers']
# Check for meaningful headers
if headers:
for header in headers:
if not header or header.strip() == "":
self._add_error("empty_header",
"Table contains empty header cells",
ValidationSeverity.HIGH)
elif len(header.strip()) < 2:
self._add_warning("short_header",
f"Header '{header}' may not be descriptive enough for screen readers",
ValidationSeverity.LOW)
# Check table size for accessibility
row_count = len(table_data['rows'])
col_count = len(headers) if headers else 0
if row_count > 50 or col_count > 10:
self._add_warning("large_table",
f"Large table ({row_count} rows, {col_count} cols) may need pagination for accessibility",
ValidationSeverity.MEDIUM)
def _add_error(self, rule: str, message: str, severity: ValidationSeverity,
row_index: Optional[int] = None, column_index: Optional[int] = None,
column_name: Optional[str] = None, actual_value: Optional[str] = None,
expected_value: Optional[str] = None, fix_suggestion: Optional[str] = None):
"""Add validation error to results"""
error = ValidationError(
rule=rule,
message=message,
severity=severity,
row_index=row_index,
column_index=column_index,
column_name=column_name,
actual_value=actual_value,
expected_value=expected_value,
fix_suggestion=fix_suggestion
)
if severity in [ValidationSeverity.CRITICAL, ValidationSeverity.HIGH]:
self.validation_errors.append(error)
else:
self.validation_warnings.append(error)
def _add_warning(self, rule: str, message: str, severity: ValidationSeverity, **kwargs):
"""Add validation warning"""
self._add_error(rule, message, severity, **kwargs)
def _generate_validation_report(self, start_time: datetime) -> Dict[str, Any]:
"""Generate comprehensive validation report"""
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
return {
'validation_summary': {
'status': 'FAILED' if self.validation_errors else 'PASSED',
'error_count': len(self.validation_errors),
'warning_count': len(self.validation_warnings),
'duration_seconds': duration,
'timestamp': end_time.isoformat()
},
'errors': [self._error_to_dict(error) for error in self.validation_errors],
'warnings': [self._error_to_dict(error) for error in self.validation_warnings],
'metrics': {
'critical_errors': len([e for e in self.validation_errors if e.severity == ValidationSeverity.CRITICAL]),
'high_errors': len([e for e in self.validation_errors if e.severity == ValidationSeverity.HIGH]),
'medium_warnings': len([e for e in self.validation_warnings if e.severity == ValidationSeverity.MEDIUM]),
'low_warnings': len([e for e in self.validation_warnings if e.severity == ValidationSeverity.LOW])
}
}
def _error_to_dict(self, error: ValidationError) -> Dict[str, Any]:
"""Convert validation error to dictionary"""
return {
'rule': error.rule,
'message': error.message,
'severity': error.severity.value,
'location': {
'row': error.row_index,
'column': error.column_index,
'column_name': error.column_name
},
'values': {
'actual': error.actual_value,
'expected': error.expected_value
},
'fix_suggestion': error.fix_suggestion
}
# Configuration loading utility
def load_validation_config(config_path: str) -> TableValidationConfig:
"""Load validation configuration from YAML file"""
with open(config_path, 'r') as f:
config_data = yaml.safe_load(f)
columns = []
for col_config in config_data.get('columns', []):
column_rule = ColumnValidationRule(
name=col_config['name'],
data_type=DataType(col_config.get('type', 'text')),
required=col_config.get('required', True),
unique=col_config.get('unique', False),
min_length=col_config.get('min_length'),
max_length=col_config.get('max_length'),
pattern=col_config.get('pattern'),
allowed_values=col_config.get('allowed_values'),
min_value=col_config.get('min_value'),
max_value=col_config.get('max_value'),
date_format=col_config.get('date_format'),
min_date=col_config.get('min_date'),
max_date=col_config.get('max_date'),
custom_validator=col_config.get('custom_validator')
)
columns.append(column_rule)
return TableValidationConfig(
table_name=config_data.get('table_name', 'Unknown'),
description=config_data.get('description', ''),
columns=columns,
allow_extra_columns=config_data.get('allow_extra_columns', False),
require_header=config_data.get('require_header', True),
min_rows=config_data.get('min_rows', 0),
max_rows=config_data.get('max_rows'),
unique_constraints=config_data.get('unique_constraints', []),
foreign_keys=config_data.get('foreign_keys', {})
)
# Demonstration function
def demonstrate_table_validation():
"""Demonstrate table validation system"""
# Sample validation configuration
config = TableValidationConfig(
table_name="employee_directory",
description="Employee directory table validation",
columns=[
ColumnValidationRule(
name="Employee ID",
data_type=DataType.INTEGER,
required=True,
unique=True,
min_value=1000,
max_value=9999
),
ColumnValidationRule(
name="Full Name",
data_type=DataType.TEXT,
required=True,
min_length=2,
max_length=50
),
ColumnValidationRule(
name="Email Address",
data_type=DataType.EMAIL,
required=True,
unique=True
),
ColumnValidationRule(
name="Department",
data_type=DataType.CATEGORICAL,
required=True,
allowed_values=["Engineering", "Marketing", "Sales", "HR", "Finance"]
)
],
min_rows=1,
max_rows=1000
)
# Sample Markdown table with intentional errors for testing
test_table = """| Employee ID | Full Name | Email Address | Department | Annual Salary |
|:------------|:----------|:--------------|:-----------|:-------------|
| 1001 | Sarah Chen | [email protected] | Engineering | $95,000.00 |
| 1002 | Mike | invalid-email | Marketing | $75,000.00 |
| 1001 | Lisa Wang | [email protected] | InvalidDept | $68,000.00 |
| abc | James Thompson | [email protected] | | $89,500.00 |
| | Emily Davis | [email protected] | HR | $72,000.00 |"""
# Run validation
validator = MarkdownTableValidator()
results = validator.validate_table_from_markdown(test_table, config)
# Display results
print("=== Table Validation Results ===")
print(f"Status: {results['validation_summary']['status']}")
print(f"Errors: {results['validation_summary']['error_count']}")
print(f"Warnings: {results['validation_summary']['warning_count']}")
print(f"Duration: {results['validation_summary']['duration_seconds']:.3f}s")
print("\n=== Validation Errors ===")
for error in results['errors']:
print(f"❌ {error['rule']}: {error['message']}")
if error['location']['row']:
print(f" Location: Row {error['location']['row']}, Column {error['location']['column_name']}")
if error['fix_suggestion']:
print(f" Fix: {error['fix_suggestion']}")
print("\n=== Validation Warnings ===")
for warning in results['warnings']:
print(f"⚠️ {warning['rule']}: {warning['message']}")
if __name__ == "__main__":
demonstrate_table_validation()
```
Content-Specific Validation Strategies
Financial Data Validation
Implementing specialized validation for financial and numerical content:
# Financial Table Validation Patterns
## Revenue and Financial Performance Validation
```yaml
# financial-validation-config.yml
table_name: "quarterly_revenue_report"
description: "Quarterly financial performance data validation"
columns:
- name: "Quarter"
type: "text"
required: true
pattern: "^Q[1-4] 20[0-9]{2}$"
- name: "Revenue"
type: "currency"
required: true
min_value: 0
max_value: 10000000
format: "$X,XXX,XXX.XX"
- name: "Expenses"
type: "currency"
required: true
min_value: 0
max_value: 10000000
- name: "Profit"
type: "currency"
required: false # Can be calculated
- name: "Growth Rate"
type: "float"
required: false
min_value: -100
max_value: 1000
format: "X.X%"
business_rules:
- name: "profit_calculation"
description: "Profit should equal Revenue minus Expenses"
validation: "profit = revenue - expenses"
tolerance: 0.01
- name: "growth_rate_logic"
description: "Growth rate should be reasonable"
validation: "growth_rate >= -50 AND growth_rate <= 200"
- name: "quarterly_progression"
description: "Quarters should be in chronological order"
validation: "quarters_sequential"
```
## Sample Financial Table with Validation
| Quarter | Revenue | Expenses | Profit | Growth Rate |
|:--------|--------:|---------:|-------:|-------------|
| Q1 2024 | $1,245,000 | $987,000 | $258,000 | +12.3% |
| Q2 2024 | $1,389,000 | $1,045,000 | $344,000 | +18.5% |
| Q3 2024 | $1,567,000 | $1,123,000 | $444,000 | +22.1% |
| Q4 2024 | $1,789,000 | $1,234,000 | $555,000 | +28.7% |
## Advanced Financial Validation Rules
```python
# financial-validators.py - Specialized financial validation
import re
from decimal import Decimal
from typing import List, Dict, Any
class FinancialTableValidator:
"""Specialized validator for financial data tables"""
def __init__(self):
self.currency_pattern = r'^\$[\d,]+\.?\d*$'
self.percentage_pattern = r'^[\+\-]?\d+\.?\d*%$'
def validate_currency_format(self, value: str) -> Dict[str, Any]:
"""Validate currency formatting"""
if not re.match(self.currency_pattern, value):
return {
'valid': False,
'error': f"Invalid currency format: {value}",
'expected': "$X,XXX.XX format"
}
# Extract numeric value
numeric_value = Decimal(value.replace('$', '').replace(',', ''))
return {
'valid': True,
'numeric_value': numeric_value,
'formatted_value': value
}
def validate_percentage_format(self, value: str) -> Dict[str, Any]:
"""Validate percentage formatting"""
if not re.match(self.percentage_pattern, value):
return {
'valid': False,
'error': f"Invalid percentage format: {value}",
'expected': "+/-X.X% format"
}
# Extract numeric value
numeric_value = float(value.replace('%', '').replace('+', ''))
return {
'valid': True,
'numeric_value': numeric_value,
'formatted_value': value
}
def validate_profit_calculation(self, revenue: str, expenses: str, profit: str, tolerance: float = 0.01) -> Dict[str, Any]:
"""Validate profit calculation accuracy"""
try:
rev_val = Decimal(revenue.replace('$', '').replace(',', ''))
exp_val = Decimal(expenses.replace('$', '').replace(',', ''))
profit_val = Decimal(profit.replace('$', '').replace(',', ''))
calculated_profit = rev_val - exp_val
difference = abs(calculated_profit - profit_val)
if difference > Decimal(str(tolerance)):
return {
'valid': False,
'error': f"Profit calculation error. Expected: ${calculated_profit}, Got: {profit}",
'calculated_value': f"${calculated_profit:,.2f}",
'difference': f"${difference:,.2f}"
}
return {'valid': True, 'calculated_value': f"${calculated_profit:,.2f}"}
except Exception as e:
return {
'valid': False,
'error': f"Error in profit calculation validation: {str(e)}"
}
def validate_growth_trend(self, values: List[float], period_names: List[str]) -> Dict[str, Any]:
"""Validate growth trend logical consistency"""
anomalies = []
for i in range(1, len(values)):
growth_rate = ((values[i] - values[i-1]) / values[i-1]) * 100
# Check for extreme growth rates
if abs(growth_rate) > 200: # 200% growth in one period
anomalies.append({
'period': period_names[i],
'growth_rate': f"{growth_rate:.1f}%",
'issue': "Extreme growth rate detected"
})
# Check for negative values
if values[i] < 0:
anomalies.append({
'period': period_names[i],
'value': values[i],
'issue': "Negative value detected"
})
return {
'valid': len(anomalies) == 0,
'anomalies': anomalies,
'trend_analysis': self._analyze_trend(values)
}
def _analyze_trend(self, values: List[float]) -> str:
"""Analyze overall trend in financial data"""
if len(values) < 2:
return "Insufficient data for trend analysis"
increasing = sum(1 for i in range(1, len(values)) if values[i] > values[i-1])
decreasing = sum(1 for i in range(1, len(values)) if values[i] < values[i-1])
total_periods = len(values) - 1
if increasing == total_periods:
return "Consistent growth trend"
elif decreasing == total_periods:
return "Consistent decline trend"
elif increasing > decreasing:
return "Generally increasing with fluctuations"
elif decreasing > increasing:
return "Generally decreasing with fluctuations"
else:
return "Volatile with no clear trend"
```
Technical Documentation Validation
Specialized validation for technical tables and API documentation:
# Technical Documentation Validation
## API Endpoint Documentation Validation
```yaml
# api-docs-validation.yml
table_name: "api_endpoints"
description: "REST API endpoint documentation validation"
columns:
- name: "Endpoint"
type: "text"
required: true
pattern: "^/api/v[0-9]+/[a-z0-9/_-]+$"
- name: "Method"
type: "categorical"
required: true
allowed_values: ["GET", "POST", "PUT", "PATCH", "DELETE"]
- name: "Authentication"
type: "categorical"
required: true
allowed_values: ["None", "API Key", "Bearer Token", "Basic Auth", "OAuth"]
- name: "Rate Limit"
type: "text"
required: true
pattern: "^[0-9]+/(second|minute|hour|day)$"
- name: "Response Format"
type: "categorical"
required: true
allowed_values: ["JSON", "XML", "Plain Text", "Binary"]
validation_rules:
- name: "endpoint_uniqueness"
description: "Endpoint + Method combination must be unique"
type: "composite_unique"
columns: ["Endpoint", "Method"]
- name: "rate_limit_reasonableness"
description: "Rate limits should be reasonable for method type"
type: "business_logic"
rules:
- "GET methods: rate_limit <= 1000/minute"
- "POST/PUT/PATCH methods: rate_limit <= 100/minute"
- "DELETE methods: rate_limit <= 10/minute"
```
## Software Configuration Validation
| Configuration Key | Data Type | Default Value | Valid Range | Required | Environment |
|:------------------|:----------|:--------------|:------------|:---------|:------------|
| **server.port** | integer | 8080 | 1024-65535 | Yes | All |
| **database.url** | url | jdbc:postgresql://localhost:5432/app | Valid JDBC URL | Yes | All |
| **cache.ttl** | duration | 300s | 1s-3600s | No | Production |
| **logging.level** | categorical | INFO | DEBUG, INFO, WARN, ERROR | No | All |
| **security.enabled** | boolean | true | true, false | Yes | Production |
## Code Review Checklist Validation
```python
# code-review-validator.py - Specialized code review table validation
class CodeReviewValidator:
"""Validator for code review checklist tables"""
def __init__(self):
self.valid_statuses = ['✅', '❌', '⚠️', '🔄', 'N/A']
self.critical_checks = [
'Security scan completed',
'All tests passing',
'Code style compliance',
'Documentation updated'
]
def validate_review_table(self, table_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Validate code review checklist table"""
errors = []
if 'Status' not in table_data['headers']:
errors.append({
'type': 'missing_column',
'message': 'Status column required for review tables'
})
return errors
status_index = table_data['headers'].index('Status')
checklist_index = table_data['headers'].index('Checkpoint') if 'Checkpoint' in table_data['headers'] else 0
critical_checks_found = set()
for row_idx, row in enumerate(table_data['rows']):
if len(row) <= status_index:
continue
status = row[status_index]
checkpoint = row[checklist_index] if len(row) > checklist_index else ''
# Validate status format
if status not in self.valid_statuses:
errors.append({
'type': 'invalid_status',
'message': f"Invalid status '{status}' in row {row_idx + 1}",
'expected': f"One of: {', '.join(self.valid_statuses)}",
'row': row_idx + 1
})
# Track critical checks
for critical_check in self.critical_checks:
if critical_check.lower() in checkpoint.lower():
critical_checks_found.add(critical_check)
# Critical checks cannot be failed
if status == '❌':
errors.append({
'type': 'critical_check_failed',
'message': f"Critical check '{checkpoint}' failed",
'severity': 'critical',
'row': row_idx + 1
})
# Check for missing critical checks
missing_critical = set(self.critical_checks) - critical_checks_found
for missing in missing_critical:
errors.append({
'type': 'missing_critical_check',
'message': f"Missing critical check: {missing}",
'severity': 'high'
})
return errors
def calculate_review_score(self, table_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate review completion score"""
if 'Status' not in table_data['headers']:
return {'error': 'Status column not found'}
status_index = table_data['headers'].index('Status')
total_checks = len(table_data['rows'])
status_counts = {status: 0 for status in self.valid_statuses}
for row in table_data['rows']:
if len(row) > status_index:
status = row[status_index]
if status in status_counts:
status_counts[status] += 1
completed = status_counts['✅']
failed = status_counts['❌']
in_progress = status_counts['🔄']
warnings = status_counts['⚠️']
completion_rate = (completed / total_checks) * 100 if total_checks > 0 else 0
return {
'total_checks': total_checks,
'completed': completed,
'failed': failed,
'in_progress': in_progress,
'warnings': warnings,
'completion_rate': round(completion_rate, 1),
'review_status': self._determine_review_status(completion_rate, failed, warnings)
}
def _determine_review_status(self, completion_rate: float, failed: int, warnings: int) -> str:
"""Determine overall review status"""
if failed > 0:
return "BLOCKED - Failed checks must be resolved"
elif completion_rate < 80:
return "IN_PROGRESS - More checks needed"
elif warnings > 2:
return "NEEDS_ATTENTION - Multiple warnings"
elif completion_rate >= 95:
return "APPROVED - Ready to merge"
else:
return "REVIEW_REQUIRED - Minor issues to address"
```
CI/CD Integration and Automated Workflows
GitHub Actions Integration
Implementing table validation in automated workflows:
# CI/CD Integration for Table Validation
## GitHub Actions Workflow
```yaml
# .github/workflows/table-validation.yml
name: Markdown Table Validation
on:
push:
branches: [ main, develop ]
paths:
- '**/*.md'
- 'validation-configs/**'
pull_request:
branches: [ main ]
paths:
- '**/*.md'
- 'validation-configs/**'
jobs:
validate-tables:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pyyaml pandas email-validator
- name: Find modified markdown files
id: changed-files
uses: tj-actions/changed-files@v40
with:
files: |
**/*.md
- name: Run table validation
run: |
python scripts/validate-all-tables.py \
--files="${{ steps.changed-files.outputs.all_changed_files }}" \
--config-dir="validation-configs" \
--output-format="github-actions" \
--fail-on-error
- name: Generate validation report
if: always()
run: |
python scripts/generate-validation-report.py \
--output="table-validation-report.html" \
--include-suggestions \
--include-statistics
- name: Upload validation report
if: always()
uses: actions/upload-artifact@v3
with:
name: table-validation-report
path: table-validation-report.html
- name: Comment PR with validation results
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
try {
const report = fs.readFileSync('validation-summary.md', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: report
});
} catch (error) {
console.log('No validation summary found or error reading file');
}
table-performance-check:
runs-on: ubuntu-latest
needs: validate-tables
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: '18'
- name: Install dependencies
run: |
npm install puppeteer lighthouse
- name: Build documentation site
run: |
bundle install
bundle exec jekyll build
- name: Run table performance audit
run: |
node scripts/table-performance-audit.js \
--site-dir="_site" \
--output="performance-report.json"
- name: Check performance thresholds
run: |
python scripts/check-performance-thresholds.py \
--report="performance-report.json" \
--max-render-time=100 \
--max-memory-usage=50
```
## Advanced Validation Script
```python
#!/usr/bin/env python3
# validate-all-tables.py - Batch table validation script
import os
import sys
import json
import argparse
import glob
from pathlib import Path
from typing import List, Dict, Any
import concurrent.futures
from markdown_table_validator import MarkdownTableValidator, load_validation_config
class BatchTableValidator:
"""Batch validation system for multiple Markdown files"""
def __init__(self, config_dir: str, max_workers: int = 4):
self.config_dir = Path(config_dir)
self.max_workers = max_workers
self.validator = MarkdownTableValidator()
self.validation_configs = {}
self.load_all_configs()
def load_all_configs(self):
"""Load all validation configurations"""
config_files = list(self.config_dir.glob('*.yml')) + list(self.config_dir.glob('*.yaml'))
for config_file in config_files:
try:
config = load_validation_config(str(config_file))
self.validation_configs[config.table_name] = config
print(f"Loaded config for: {config.table_name}")
except Exception as e:
print(f"Error loading config {config_file}: {e}")
def validate_files(self, file_paths: List[str]) -> Dict[str, Any]:
"""Validate multiple Markdown files"""
results = {
'summary': {
'total_files': len(file_paths),
'files_with_tables': 0,
'files_validated': 0,
'total_errors': 0,
'total_warnings': 0,
'validation_passed': True
},
'file_results': {}
}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_file = {
executor.submit(self.validate_file, file_path): file_path
for file_path in file_paths
}
for future in concurrent.futures.as_completed(future_to_file):
file_path = future_to_file[future]
try:
file_result = future.result()
results['file_results'][file_path] = file_result
# Update summary
if file_result['tables_found'] > 0:
results['summary']['files_with_tables'] += 1
if file_result['validation_run']:
results['summary']['files_validated'] += 1
results['summary']['total_errors'] += file_result['total_errors']
results['summary']['total_warnings'] += file_result['total_warnings']
if file_result['total_errors'] > 0:
results['summary']['validation_passed'] = False
except Exception as e:
results['file_results'][file_path] = {
'error': f"Validation failed: {str(e)}",
'tables_found': 0,
'validation_run': False,
'total_errors': 1,
'total_warnings': 0
}
results['summary']['total_errors'] += 1
results['summary']['validation_passed'] = False
return results
def validate_file(self, file_path: str) -> Dict[str, Any]:
"""Validate tables in a single Markdown file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extract tables from markdown content
tables = self.extract_tables(content)
file_result = {
'file_path': file_path,
'tables_found': len(tables),
'validation_run': False,
'total_errors': 0,
'total_warnings': 0,
'table_results': []
}
if not tables:
return file_result
for i, table_content in enumerate(tables):
table_result = self.validate_single_table(table_content, file_path, i)
file_result['table_results'].append(table_result)
file_result['total_errors'] += table_result['error_count']
file_result['total_warnings'] += table_result['warning_count']
file_result['validation_run'] = True
return file_result
except Exception as e:
return {
'file_path': file_path,
'error': str(e),
'tables_found': 0,
'validation_run': False,
'total_errors': 1,
'total_warnings': 0
}
def extract_tables(self, content: str) -> List[str]:
"""Extract table content from Markdown"""
import re
# Pattern to match Markdown tables
table_pattern = r'(\|[^\n]*\|(?:\n\|[^\n]*\|)*)'
tables = re.findall(table_pattern, content, re.MULTILINE)
# Filter out single-line false matches
valid_tables = []
for table in tables:
lines = table.strip().split('\n')
if len(lines) >= 3: # Header + separator + at least one data row
valid_tables.append(table.strip())
return valid_tables
def validate_single_table(self, table_content: str, file_path: str, table_index: int) -> Dict[str, Any]:
"""Validate a single table"""
# Try to match table with appropriate config
config = self.select_config_for_table(table_content)
if not config:
return {
'table_index': table_index,
'config_used': 'none',
'validation_skipped': True,
'reason': 'No matching validation config found',
'error_count': 0,
'warning_count': 0
}
validation_result = self.validator.validate_table_from_markdown(table_content, config)
return {
'table_index': table_index,
'config_used': config.table_name,
'validation_skipped': False,
'error_count': validation_result['validation_summary']['error_count'],
'warning_count': validation_result['validation_summary']['warning_count'],
'status': validation_result['validation_summary']['status'],
'errors': validation_result['errors'],
'warnings': validation_result['warnings']
}
def select_config_for_table(self, table_content: str) -> Optional['TableValidationConfig']:
"""Select appropriate validation config for table"""
# Simple heuristic: try to match based on header content
lines = table_content.strip().split('\n')
if len(lines) < 1:
return None
header_line = lines[0].lower()
# Priority matching based on header keywords
for config_name, config in self.validation_configs.items():
config_keywords = config_name.lower().split('_')
if any(keyword in header_line for keyword in config_keywords):
return config
# Fallback to first available config
if self.validation_configs:
return next(iter(self.validation_configs.values()))
return None
def generate_github_actions_output(self, results: Dict[str, Any]):
"""Generate GitHub Actions formatted output"""
summary = results['summary']
if not summary['validation_passed']:
print("::error::Table validation failed")
print(f"::notice::Validated {summary['files_validated']} files with {summary['files_with_tables']} containing tables")
# Output errors and warnings
for file_path, file_result in results['file_results'].items():
if 'error' in file_result:
print(f"::error file={file_path}::{file_result['error']}")
continue
for table_result in file_result.get('table_results', []):
for error in table_result.get('errors', []):
location = ""
if error['location']['row']:
location = f",line={error['location']['row']}"
print(f"::error file={file_path}{location}::{error['message']}")
for warning in table_result.get('warnings', []):
location = ""
if warning['location']['row']:
location = f",line={warning['location']['row']}"
print(f"::warning file={file_path}{location}::{warning['message']}")
def main():
parser = argparse.ArgumentParser(description='Batch Markdown table validation')
parser.add_argument('--files', required=True, help='Space-separated list of files to validate')
parser.add_argument('--config-dir', required=True, help='Directory containing validation configs')
parser.add_argument('--output-format', choices=['json', 'github-actions'], default='json')
parser.add_argument('--fail-on-error', action='store_true', help='Exit with error code if validation fails')
parser.add_argument('--max-workers', type=int, default=4, help='Maximum number of worker threads')
args = parser.parse_args()
file_list = args.files.split() if args.files else []
validator = BatchTableValidator(args.config_dir, args.max_workers)
results = validator.validate_files(file_list)
if args.output_format == 'github-actions':
validator.generate_github_actions_output(results)
else:
print(json.dumps(results, indent=2))
if args.fail_on_error and not results['summary']['validation_passed']:
sys.exit(1)
if __name__ == "__main__":
main()
```
Integration with Documentation Systems
Advanced table validation integrates seamlessly with comprehensive documentation workflows. When combined with automated testing and validation frameworks, table validation ensures data consistency across documentation updates while maintaining quality standards through systematic verification processes.
For comprehensive content management, validation systems work effectively with collaborative editing and version control workflows to enable distributed teams to maintain consistent data presentation standards while preventing validation errors through pre-commit hooks and automated quality checks.
When building sophisticated documentation platforms, table validation complements content organization and project structure systems by enabling systematic data quality management across large documentation repositories while supporting complex validation rules and business logic verification.
Performance Optimization and Monitoring
Validation Performance Metrics
Implementing comprehensive performance monitoring for validation systems:
#!/usr/bin/env python3
# validation-performance-monitor.py - Performance monitoring system
import time
import psutil
import threading
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import statistics
@dataclass
class PerformanceMetrics:
"""Performance metrics for validation operations"""
operation: str
start_time: datetime
end_time: Optional[datetime] = None
duration_ms: Optional[float] = None
memory_usage_mb: Optional[float] = None
cpu_usage_percent: Optional[float] = None
rows_processed: Optional[int] = None
errors_found: Optional[int] = None
class ValidationPerformanceMonitor:
"""Performance monitoring for table validation operations"""
def __init__(self):
self.metrics_history: List[PerformanceMetrics] = []
self.active_operations: Dict[str, PerformanceMetrics] = {}
self.monitoring_enabled = True
self.process = psutil.Process()
def start_operation(self, operation_id: str, operation_type: str, rows_count: Optional[int] = None) -> PerformanceMetrics:
"""Start monitoring a validation operation"""
if not self.monitoring_enabled:
return None
metrics = PerformanceMetrics(
operation=operation_type,
start_time=datetime.now(),
rows_processed=rows_count
)
self.active_operations[operation_id] = metrics
return metrics
def end_operation(self, operation_id: str, errors_found: int = 0) -> PerformanceMetrics:
"""End monitoring and calculate metrics"""
if operation_id not in self.active_operations:
return None
metrics = self.active_operations[operation_id]
metrics.end_time = datetime.now()
metrics.duration_ms = (metrics.end_time - metrics.start_time).total_seconds() * 1000
metrics.errors_found = errors_found
# Capture resource usage
try:
memory_info = self.process.memory_info()
metrics.memory_usage_mb = memory_info.rss / (1024 * 1024)
metrics.cpu_usage_percent = self.process.cpu_percent()
except Exception:
pass # Resource monitoring failed
self.metrics_history.append(metrics)
del self.active_operations[operation_id]
return metrics
def get_performance_summary(self, operation_type: Optional[str] = None,
time_window: Optional[timedelta] = None) -> Dict[str, Any]:
"""Generate performance summary report"""
# Filter metrics based on criteria
filtered_metrics = self.metrics_history
if operation_type:
filtered_metrics = [m for m in filtered_metrics if m.operation == operation_type]
if time_window:
cutoff_time = datetime.now() - time_window
filtered_metrics = [m for m in filtered_metrics if m.start_time >= cutoff_time]
if not filtered_metrics:
return {'error': 'No metrics found for criteria'}
# Calculate statistics
durations = [m.duration_ms for m in filtered_metrics if m.duration_ms is not None]
memory_usage = [m.memory_usage_mb for m in filtered_metrics if m.memory_usage_mb is not None]
rows_processed = [m.rows_processed for m in filtered_metrics if m.rows_processed is not None]
errors_found = [m.errors_found for m in filtered_metrics if m.errors_found is not None]
summary = {
'operation_count': len(filtered_metrics),
'time_window': str(time_window) if time_window else 'All time',
'operation_type': operation_type or 'All operations'
}
if durations:
summary['performance'] = {
'avg_duration_ms': statistics.mean(durations),
'min_duration_ms': min(durations),
'max_duration_ms': max(durations),
'median_duration_ms': statistics.median(durations),
'p95_duration_ms': self._percentile(durations, 95),
'p99_duration_ms': self._percentile(durations, 99)
}
if memory_usage:
summary['memory'] = {
'avg_memory_mb': statistics.mean(memory_usage),
'peak_memory_mb': max(memory_usage),
'min_memory_mb': min(memory_usage)
}
if rows_processed:
summary['throughput'] = {
'total_rows': sum(rows_processed),
'avg_rows_per_operation': statistics.mean(rows_processed),
'avg_rows_per_second': self._calculate_throughput(filtered_metrics)
}
if errors_found:
summary['quality'] = {
'total_errors': sum(errors_found),
'avg_errors_per_operation': statistics.mean(errors_found),
'error_rate_percent': (sum(errors_found) / sum(rows_processed)) * 100 if rows_processed else 0
}
return summary
def _percentile(self, data: List[float], percentile: int) -> float:
"""Calculate percentile value"""
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower = sorted_data[int(index)]
upper = sorted_data[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def _calculate_throughput(self, metrics: List[PerformanceMetrics]) -> float:
"""Calculate average throughput in rows per second"""
total_rows = 0
total_time_seconds = 0
for metric in metrics:
if metric.rows_processed and metric.duration_ms:
total_rows += metric.rows_processed
total_time_seconds += metric.duration_ms / 1000
return total_rows / total_time_seconds if total_time_seconds > 0 else 0
def detect_performance_anomalies(self, window_size: int = 10) -> List[Dict[str, Any]]:
"""Detect performance anomalies in recent operations"""
if len(self.metrics_history) < window_size:
return []
recent_metrics = self.metrics_history[-window_size:]
anomalies = []
# Calculate baseline performance
durations = [m.duration_ms for m in recent_metrics if m.duration_ms]
if len(durations) < 3:
return anomalies
avg_duration = statistics.mean(durations)
stdev_duration = statistics.stdev(durations) if len(durations) > 1 else 0
# Detect outliers (values beyond 2 standard deviations)
threshold = avg_duration + (2 * stdev_duration)
for metric in recent_metrics:
if metric.duration_ms and metric.duration_ms > threshold:
anomalies.append({
'type': 'slow_operation',
'operation': metric.operation,
'duration_ms': metric.duration_ms,
'expected_max_ms': threshold,
'timestamp': metric.start_time.isoformat(),
'severity': 'high' if metric.duration_ms > threshold * 1.5 else 'medium'
})
# Check for memory anomalies
memory_values = [m.memory_usage_mb for m in recent_metrics if m.memory_usage_mb]
if memory_values:
avg_memory = statistics.mean(memory_values)
memory_threshold = avg_memory * 2 # 100% increase threshold
for metric in recent_metrics:
if metric.memory_usage_mb and metric.memory_usage_mb > memory_threshold:
anomalies.append({
'type': 'high_memory_usage',
'operation': metric.operation,
'memory_mb': metric.memory_usage_mb,
'expected_max_mb': memory_threshold,
'timestamp': metric.start_time.isoformat(),
'severity': 'high'
})
return anomalies
def generate_performance_report(self, include_recommendations: bool = True) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
overall_summary = self.get_performance_summary()
recent_summary = self.get_performance_summary(time_window=timedelta(hours=24))
anomalies = self.detect_performance_anomalies()
report = {
'report_timestamp': datetime.now().isoformat(),
'overall_performance': overall_summary,
'recent_performance': recent_summary,
'anomalies': anomalies,
'operation_types': self._get_operation_type_breakdown()
}
if include_recommendations:
report['recommendations'] = self._generate_recommendations(overall_summary, anomalies)
return report
def _get_operation_type_breakdown(self) -> Dict[str, Dict[str, Any]]:
"""Get performance breakdown by operation type"""
operation_types = set(m.operation for m in self.metrics_history)
breakdown = {}
for op_type in operation_types:
breakdown[op_type] = self.get_performance_summary(operation_type=op_type)
return breakdown
def _generate_recommendations(self, summary: Dict[str, Any], anomalies: List[Dict[str, Any]]) -> List[str]:
"""Generate performance improvement recommendations"""
recommendations = []
if 'performance' in summary:
avg_duration = summary['performance']['avg_duration_ms']
p95_duration = summary['performance']['p95_duration_ms']
if avg_duration > 1000: # 1 second
recommendations.append(
"Consider optimizing validation algorithms or implementing parallel processing for large tables"
)
if p95_duration > avg_duration * 3:
recommendations.append(
"High variability in processing times detected. Review input data size distribution"
)
if 'memory' in summary:
peak_memory = summary['memory']['peak_memory_mb']
if peak_memory > 500: # 500 MB
recommendations.append(
"High memory usage detected. Consider implementing streaming validation for large datasets"
)
if 'quality' in summary:
error_rate = summary['quality']['error_rate_percent']
if error_rate > 10:
recommendations.append(
"High error rate suggests data quality issues. Consider implementing pre-validation checks"
)
if len([a for a in anomalies if a['severity'] == 'high']) > 2:
recommendations.append(
"Multiple high-severity performance anomalies detected. Review system resources and validation rules"
)
if not recommendations:
recommendations.append("Performance metrics are within acceptable ranges")
return recommendations
# Global performance monitor instance
performance_monitor = ValidationPerformanceMonitor()
Conclusion
Advanced Markdown table data validation and quality assurance represent essential practices for maintaining reliable, accurate, and accessible tabular content across large-scale documentation projects. By implementing comprehensive validation frameworks, automated error detection systems, and performance monitoring tools, technical writers can ensure consistent data quality while preventing common formatting errors and accessibility violations that compromise user experience.
The key to successful table validation implementation lies in understanding content-specific validation requirements, implementing systematic quality checks that integrate seamlessly with existing workflows, and maintaining performance standards that scale effectively with growing datasets. Whether you’re managing financial reports, technical specifications, or collaborative documentation, the validation techniques and automation strategies covered in this guide provide the foundation for building robust quality assurance systems.
Remember to implement validation rules progressively, starting with critical structural checks before adding sophisticated business logic validation, and ensure that error messages provide clear guidance for content creators to resolve issues quickly. With proper implementation of automated validation workflows, your Markdown tables can achieve enterprise-level data quality standards while maintaining the simplicity and version-control benefits that make Markdown an ideal choice for collaborative technical documentation projects.