Codebase Navigation & Structure (Educational Version)
Note: This is the educational, human-readable version with examples and detailed explanations. For the AI-optimized version, see 1-CONTEXT/_CODEBASE.md.
Codebase Map (AI-Optimized)
Purpose: Quick navigation reference for Claude Code Token Budget: ~2K tokens (part of 50K Layer 1 budget) Last Updated: 2025-11-13 (auto-generated)
Root Structure
epgoat-internal/
├── backend/
│ ├── epgoat/ # Python EPG engine (CORE)
│ └── config/ # YAML configuration files
├── frontend/ # Web apps (NOT YET BUILT)
├── .github/workflows/ # CI/CD (NOT YET BUILT)
├── Documentation/ # 151 files (UNDER RENOVATION)
├── CLAUDE.md # AI assistant instructions
├── Makefile # Build commands
├── requirements.txt # Python dependencies
└── pytest.ini # Test configuration
Backend: backend/epgoat/ (CORE)
Purpose: Python 3.11 EPG generation engine Total Files: ~150+ Python files Test Coverage: 98.2% (770/784 tests passing)
Layer Architecture (Domain-Driven Design)
backend/epgoat/
├── domain/ # Core business logic (models, schemas, patterns)
├── application/ # High-level workflows (epg_generator.py)
├── services/ # 30+ business services (matching, caching, enrichment)
├── infrastructure/ # External dependencies (database, API clients, parsers)
├── cli/ # Command-line interfaces
├── data/ # Data access layer (deprecated, migrating to services)
├── tests/ # Test suite (784 tests)
└── utilities/ # Helper scripts (refresh DB, migrations)
📊 Domain-Driven Design Layer Architecture
graph TB
CLI[CLI Layer<br/>cli/] --> App[Application Layer<br/>application/]
App --> Services[Services Layer<br/>services/]
Services --> Domain[Domain Layer<br/>domain/]
Services --> Infra[Infrastructure Layer<br/>infrastructure/]
Domain --> Models[models.py<br/>schemas.py<br/>patterns.py]
Services --> Enrich[enrichment/<br/>30+ services]
Infra --> DB[database/<br/>clients/<br/>parsers/]
CLI -.->|imports| App
App -.->|imports| Services
Services -.->|imports| Domain
Services -.->|imports| Infra
Domain -.->|no deps| None[❌ No External Deps]
style Domain fill:#90EE90
style Services fill:#FFD700
style App fill:#87CEEB
style CLI fill:#FFB6C1
style Infra fill:#DDA0DD
style None fill:#FF6B6B
*Dependency Flow: Top layers depend on bottom layers, never reverse.
Domain Layer (green): Pure business logic, no external dependencies. - Models: Event, Channel, Participant - Schemas: Pydantic validation - Patterns: Regex patterns for matching
Services Layer (yellow): Business logic orchestration. - Enrichment pipeline (7 handlers) - Caching services - Team name resolution - Provider config management
Application Layer (blue): High-level workflows. - EPG generation pipeline - Orchestration logic
CLI Layer (pink): Command-line interfaces. - run_provider.py - onboard_provider.py
Infrastructure Layer (purple): External dependencies. - Database repositories - API clients (TheSportsDB, ESPN) - M3U parsers
Key Rule: Domain layer has ZERO external dependencies. Can be tested without database, API, or file system. *
📊 Backend Package Structure (backend/epgoat/)
graph TD
Root[backend/epgoat/] --> Domain[domain/]
Root --> Services[services/]
Root --> App[application/]
Root --> Infra[infrastructure/]
Root --> CLI[cli/]
Root --> Tests[tests/]
Root --> Utils[utilities/]
Domain --> DomainFiles[models.py<br/>schemas.py<br/>patterns.py<br/>parsers.py<br/>xmltv.py<br/>config.py]
Services --> ServicesFiles[30+ service files<br/>enrichment/<br/>caching/<br/>matching/]
Infra --> InfraFiles[database/<br/>clients/<br/>parsers/]
CLI --> CLIFiles[run_provider.py<br/>onboard_provider.py<br/>provider_runner/]
Tests --> TestFiles[test_patterns.py<br/>test_parsers.py<br/>test_integration.py<br/>fixtures/]
style Root fill:#FFD700
style Domain fill:#90EE90
style Services fill:#87CEEB
style Infra fill:#DDA0DD
style CLI fill:#FFB6C1
*Navigation Guide:
- domain/: Start here to understand core business logic
- services/: Look here for matching, caching, enrichment
- infrastructure/: External integrations (DB, APIs)
- application/: High-level workflows (EPG generation)
- cli/: Entry points for command-line tools
- tests/: Unit, integration, and E2E tests
- utilities/: Helper scripts (DB refresh, migrations) *
Domain Layer: domain/
Purpose: Core business logic, no external dependencies
| File | Purpose | Lines |
|---|---|---|
models.py |
Core data models (Event, Channel, Participant, etc.) | ~400 |
schemas.py |
Pydantic validation schemas | ~300 |
patterns.py |
Channel name regex patterns | ~200 |
parsers.py |
Channel parsing logic (teams, times) | ~400 |
xmltv.py |
XMLTV XML generation | ~300 |
config.py |
Configuration loading | ~150 |
provider_config.py |
Provider-specific configuration models | ~200 |
datetime_utils.py |
Date/time parsing utilities | ~100 |
Key Models:
- Event: Sports event (teams, league, date/time, venue)
- Channel: IPTV channel (name, tvg-id, URL)
- EnrichmentContext: Data passed through matching pipeline
- ProviderConfig: Provider patterns, VOD filters, TVG-ID mappings
📖 Domain Layer Philosophy - Pure Business Logic
The Domain Layer is the heart of EPGOAT's architecture. It contains pure business logic with ZERO external dependencies.
Why Pure Domain Logic?
- Testability: No mocks needed for domain tests
- Test Event model without database
- Test Channel parsing without M3U files
- Test XMLTV generation without file system
-
Fast tests (milliseconds, not seconds)
-
Portability: Domain logic works anywhere
- Switch from Supabase to PostgreSQL? Domain unchanged.
- Switch from TheSportsDB to ESPN? Domain unchanged.
-
Switch from Cloudflare to AWS? Domain unchanged.
-
Understanding: Easy to learn system
- Start with domain models (Event, Channel, Participant)
- No infrastructure noise (APIs, databases, file systems)
-
Pure Python (no external libraries)
-
Longevity: Domain outlives infrastructure
- APIs change, databases change, frameworks change
- Business logic (matching, scheduling, XMLTV) is stable
- Domain layer has longest lifespan
EPGOAT Domain Files:
- models.py: Core data structures (Event, Channel, M3UEntry)
- schemas.py: Pydantic validation (EventSchema, ChannelSchema)
- patterns.py: Channel regex patterns (ALLOWED_CHANNEL_PATTERNS)
- parsers.py: Channel parsing logic (extract teams, times)
- xmltv.py: XMLTV XML generation
- config.py: Configuration loading
- datetime_utils.py: Date/time parsing utilities
What's NOT in Domain Layer:
- ❌ Database queries
- ❌ API calls
- ❌ File I/O
- ❌ HTTP requests
- ❌ Third-party libraries (except standard library)
Dependency Rule: Domain layer depends on NOTHING external. All other layers depend on domain layer.
Real Example:
# Domain layer (pure logic):
@dataclass
class Event:
"""Sports event (pure data, no dependencies)."""
id: int
name: str
date: str
league: str
home_team: str
away_team: str
# Services layer (uses domain):
class EventRepository:
"""Manages Event persistence (infrastructure dependency)."""
def find_by_id(self, id: int) -> Event:
# Database query (infrastructure)
row = db.query("SELECT * FROM events WHERE id = ?", id)
# Return domain model (pure)
return Event(**row)
Benefits in Practice:
- Domain tests run in 0.1s (no database setup)
- Domain models used in CLI, web API, batch jobs (portable)
- Business logic clear and focused
- Infrastructure changes don't break domain
Services Layer: services/ (30+ Services)
Purpose: Business logic, orchestration, caching
📖 Services Layer - Business Logic Orchestration
The Services Layer orchestrates business logic by coordinating domain models and infrastructure components.
Services vs Domain:
| Domain Layer | Services Layer |
|---|---|
| Pure data structures | Business logic orchestration |
| No external dependencies | Uses infrastructure |
| Event, Channel, M3UEntry | EventService, MatchingService |
| Simple operations | Complex workflows |
Service Categories in EPGOAT:
1. Enrichment Services (Core matching logic)
- EnrichmentPipeline: Orchestrates 7-handler chain
- EnhancedMatchCache: Short-term match caching
- CrossProviderCache: Learn from other providers
- RegexMatcher: Multi-stage pattern matching
- TeamNameResolutionService: LLM-based team name resolution
2. Provider Services (Provider management)
- ProviderConfigManager: Load provider configs (YAML + DB)
- ProviderOnboardingService: Auto-discover channel patterns
- ProviderOrchestrator: Parallel provider processing
3. Data Services (Event/participant management)
- EventDeduplicationService: Remove duplicate events (92% reduction)
- ParticipantService: Manage teams/players
- LeagueInferenceService: Infer leagues from channel families
4. Utility Services (Helper functionality)
- CostTracker: Track LLM/API costs
- LogoGenerator: Generate sport logos
- R2StorageService: Upload to Cloudflare R2
- MatchLearner: Self-learning from successful matches
Service Pattern:
class EventService:
"""Event management service (orchestrates domain + infrastructure)."""
def __init__(
self,
repo: EventRepository, # Infrastructure
api: TheSportsDBClient # Infrastructure
) -> None:
self.repo = repo
self.api = api
def get_or_fetch_event(self, event_id: int) -> Event:
"""Get event from DB or fetch from API (orchestration)."""
# Try repository first (cache):
event = self.repo.find_by_id(event_id)
if event:
return event
# Fall back to API:
data = self.api.get_event(event_id)
# Convert to domain model:
event = Event(**data)
# Save to repository:
self.repo.create(event)
return event
Why Services Layer?
- Reusability: Service methods used by CLI, API, batch jobs
- Testing: Mock infrastructure, test business logic
- Separation: Domain stays pure, infrastructure isolated
- Complexity: Hide infrastructure complexity from application layer
EPGOAT Example - Enrichment Pipeline:
The enrichment pipeline is a perfect service layer component: - Uses domain models (Channel, Event) - Orchestrates infrastructure (database, API, cache) - Implements business logic (matching strategy) - Testable (mock handlers) - Reusable (CLI, API, batch jobs)
Enrichment Pipeline (Core Matching Logic)
services/enrichment/
├── pipeline.py # Orchestrates 7-handler chain
├── context.py # EnrichmentContext (data container)
├── factory.py # Handler factory (dependency injection)
├── handlers/ # 7 matching handlers
│ ├── enhanced_match_cache_handler.py # Stage 1: Cache lookup
│ ├── event_details_cache_handler.py # Stage 2: Event details
│ ├── local_database_handler.py # Stage 3: Local events DB
│ ├── regex_handler.py # Stage 4: Regex matching
│ ├── cross_provider_cache_handler.py # Stage 5: Cross-provider
│ ├── api_handler.py # Stage 6: API calls
│ └── fallback_handler.py # Stage 7: LLM fallback
├── services/ # Preprocessing services
│ ├── team_parsing_service.py # Extract team names
│ ├── sport_detection_service.py # Detect sport type
│ ├── league_inference_service.py # Infer league from family
│ └── time_extraction_service.py # Parse date/time
├── observers/ # Cross-cutting concerns
│ └── cost_tracking_observer.py # Track API costs
└── tests/ # 20+ test files
📊 Enrichment Pipeline - 7 Handler Chain
flowchart TD
Start[Channel Input] --> H1[Handler 1:<br/>Enhanced Match Cache]
H1 -->|Cache HIT| Done[Return Match]
H1 -->|Cache MISS| H2[Handler 2:<br/>Event Details Cache]
H2 -->|Cache HIT| Done
H2 -->|Cache MISS| H3[Handler 3:<br/>Local Database Lookup]
H3 -->|DB HIT| Done
H3 -->|DB MISS| H4[Handler 4:<br/>Regex Matcher]
H4 -->|Match FOUND| Done
H4 -->|No Match| H5[Handler 5:<br/>Cross-Provider Cache]
H5 -->|Cache HIT| Done
H5 -->|Cache MISS| H6[Handler 6:<br/>API Handler]
H6 -->|API HIT| Done
H6 -->|API MISS| H7[Handler 7:<br/>LLM Fallback]
H7 -->|LLM Match| Done
H7 -->|No Match| NoMatch[No Match Found]
style H1 fill:#90EE90
style H2 fill:#90EE90
style H3 fill:#FFD700
style H4 fill:#87CEEB
style H5 fill:#FFB6C1
style H6 fill:#DDA0DD
style H7 fill:#FF6B6B
style Done fill:#32CD32
style NoMatch fill:#DC143C
*Chain of Responsibility Pattern: Each handler tries to match the channel. If it can't, it passes to the next handler.
Handler Priorities (fastest → slowest):
- Enhanced Match Cache (O(1) lookup, 0ms)
- Same-day re-processing optimization
-
95%+ hit rate for multiple runs per day
-
Event Details Cache (O(1) lookup, 0ms)
-
Cached event details from previous lookups
-
Local Database (SQL query, 10-50ms)
-
Query events table for date/league match
-
Regex Matcher (Pattern matching, 1-5ms)
-
Multi-stage: exact → fuzzy → team extraction
-
Cross-Provider Cache (O(1) lookup, 0ms)
-
Learn from other providers (shared matches)
-
API Handler (HTTP request, 100-500ms)
-
TheSportsDB API call (costs money + time)
-
LLM Fallback (Claude API, 1000-3000ms)
- Last resort for complex matches (costs $$$ + time)
Performance: 90% of channels matched in handlers 1-4 (< 50ms each) *
📖 Chain of Responsibility Pattern - Enrichment Pipeline
The Enrichment Pipeline uses the Chain of Responsibility pattern to implement a flexible, extensible matching strategy.
Pattern Overview:
Chain of Responsibility passes a request through a chain of handlers. Each handler decides whether to process the request or pass it to the next handler.
Benefits:
- Flexibility: Add/remove handlers without changing pipeline
- Priority: Handlers run in priority order (cache → DB → API)
- Early Exit: Stop when match found (no wasted processing)
- Testability: Test each handler independently
- Monitoring: Track which handler found match
EPGOAT Implementation:
class EnrichmentHandler(Protocol):
"""Protocol for enrichment handlers (Open/Closed Principle)."""
def handle(self, context: EnrichmentContext) -> EnrichmentContext:
"""Process enrichment context.
Args:
context: Current enrichment context
Returns:
Updated context (with match if found)
"""
...
class EnrichmentPipeline:
"""Orchestrates chain of handlers."""
def __init__(self, handlers: List[EnrichmentHandler]) -> None:
self.handlers = handlers
def enrich(self, channel: Channel) -> EnrichmentContext:
"""Run channel through handler chain."""
context = EnrichmentContext(channel=channel)
for handler in self.handlers:
context = handler.handle(context)
# Early exit if match found:
if context.matched_event:
context.matched_by = handler.__class__.__name__
break
return context
Handler Chain (priority order):
- EnhancedMatchCacheHandler: O(1) cache lookup (0ms)
- EventDetailsCacheHandler: Cached event details (0ms)
- LocalDatabaseHandler: SQL query (10-50ms)
- RegexHandler: Pattern matching (1-5ms)
- CrossProviderCacheHandler: Shared cache (0ms)
- APIHandler: HTTP request (100-500ms)
- FallbackHandler: LLM call (1000-3000ms)
Performance Optimization:
- 90% of channels matched in handlers 1-4 (< 50ms)
- Expensive handlers (6-7) rarely used
- Each handler adds minimal overhead if not matched
Adding New Handler:
# 1. Implement EnrichmentHandler protocol:
class NewHandler:
def handle(self, context: EnrichmentContext) -> EnrichmentContext:
if context.matched_event:
return context # Already matched, skip
# Your matching logic here:
match = your_matching_logic(context.channel)
if match:
context.matched_event = match
return context
# 2. Add to factory:
class HandlerFactory:
def create_handlers(self) -> List[EnrichmentHandler]:
return [
EnhancedMatchCacheHandler(),
NewHandler(), # Insert in priority order
LocalDatabaseHandler(),
# ...
]
# 3. Done! No changes to pipeline code (Open/Closed Principle)
Monitoring Example:
Channel: "NBA 01: Lakers vs Celtics"
├─ EnhancedMatchCacheHandler: MISS (0ms)
├─ EventDetailsCacheHandler: MISS (0ms)
├─ LocalDatabaseHandler: HIT (25ms) ✅
└─ Matched by: LocalDatabaseHandler
Channel: "NFL 05: Patriots vs Cowboys"
├─ EnhancedMatchCacheHandler: HIT (0ms) ✅
└─ Matched by: EnhancedMatchCacheHandler
Key Takeaway: Chain of Responsibility makes EPGOAT's matching strategy flexible, fast, and easy to extend.
📊 EPG Generation Data Flow
flowchart LR
M3U[M3U Playlist] --> Parser[M3U Parser]
Parser --> VOD[VOD Filter<br/>91.7% reduction]
VOD --> Channels[Channel List]
Channels --> Enrich[Enrichment Pipeline<br/>7 handlers]
DB[(Events DB)] --> Enrich
API[TheSportsDB API] --> Enrich
Cache[Match Cache] --> Enrich
Enrich --> Matches[Matched Events]
Matches --> Schedule[Schedule Builder<br/>Pre/Live/Post blocks]
Schedule --> XMLTV[XMLTV Generator]
XMLTV --> Output[EPG XML File]
Matches --> SaveDB[Save to Database]
style M3U fill:#E8F5E9
style Output fill:#E8F5E9
style DB fill:#FFF3E0
style API fill:#E1F5FE
style Cache fill:#F3E5F5
*Complete EPG Generation Flow:
- Input: M3U playlist from provider URL
- Parse: Extract channel metadata (name, tvg-id, group)
- Filter: Remove VOD channels (91.7% reduction)
- Enrich: Match channels to events (7-handler pipeline)
- Schedule: Generate programme blocks (pre/live/post)
- Output: XMLTV XML file for IPTV players
- Save: Store matches in database for caching
Performance: 100-500 channels processed in 30-120 seconds *
Key Services
| Service | Purpose | File |
|---|---|---|
| Provider Config Manager | Load provider configs (YAML cache + DB) | provider_config_manager.py |
| Provider Onboarding | Auto-discover channel patterns | provider_onboarding_service.py |
| Provider Orchestrator | Parallel provider processing | provider_orchestrator.py |
| Team Name Resolution | LLM-based canonical team names | team_name_resolution_service.py |
| Enhanced League Inference | Map families to leagues | enhanced_league_inference.py |
| Enhanced Match Cache | High-performance match cache | enhanced_match_cache.py |
| Cross-Provider Cache | Learn from other providers | cross_provider_cache.py |
| Event Deduplication | Remove duplicate events | event_deduplication.py |
| Regex Matcher | Multi-stage pattern matching | regex_matcher.py |
| API Enrichment | External API calls (deprecated, replaced by enrichment/) | api_enrichment.py |
| Cost Tracker | Track LLM/API costs | cost_tracker.py |
| Logo Generator | Generate sport logos | logo_generator.py |
| R2 Storage | Cloudflare R2 uploads | core/r2_storage.py |
| GitHub Issue Service | Auto-create validation issues | integrations/github_issue_service.py |
| Match Learner | Self-learning from successful matches | match_learner.py |
| Schedulers | Generate EPG programme blocks | schedulers.py |
Infrastructure Layer: infrastructure/
Purpose: External dependencies (databases, APIs, parsers)
Database: infrastructure/database/
database/
├── connection.py # Supabase PostgreSQL connection
├── migration_runner.py # Run SQL migrations
├── migrate.py # Migration CLI
├── migrations/ # 17 numbered SQL migrations
│ ├── 001_initial_schema.sql
│ ├── 017_simplify_team_discovery.sql # Latest (2025-11-07)
│ └── ...
├── repositories/ # Data access patterns
│ ├── event_repository.py # Event CRUD
│ ├── participant_repository.py # Participant CRUD
│ ├── unmatched_channel_repository.py # Unmatched channels
│ ├── supabase_event_repository.py # Supabase-specific implementation
│ └── base_repository.py # Base class with soft deletes
└── postgrest_cache.py # PostgREST caching layer
💡 Repository Pattern - BaseRepository Example
"""Base Repository class with common CRUD operations.
All repository classes should inherit from this to get standard database operations.
Implements soft delete pattern (Core Principle #4: Data is Forever).
"""
from typing import Any, List, Optional
# Whitelist of allowed table names (SQL injection prevention)
ALLOWED_TABLES = {
"events",
"participants",
"unmatched_channels",
"match_cache",
"providers",
}
class BaseRepository:
"""Base repository with soft delete support.
Provides standard CRUD operations for all repositories.
Enforces soft delete pattern (never hard delete).
Example:
>>> class EventRepository(BaseRepository):
... def __init__(self, connection):
... super().__init__(connection, "events")
...
>>> repo = EventRepository(db_connection)
>>> event = repo.find_by_id(123)
>>> repo.delete(123) # Soft delete (sets record_status='deleted')
"""
def __init__(self, connection: Any, table_name: str) -> None:
"""Initialize repository with database connection.
Args:
connection: Database connection object
table_name: Name of database table
Raises:
ValueError: If table_name not in ALLOWED_TABLES (SQL injection prevention)
"""
if table_name not in ALLOWED_TABLES:
raise ValueError(
f"Table '{table_name}' not in ALLOWED_TABLES. "
f"Add to whitelist if this is a valid table."
)
self.connection = connection
self.table_name = table_name
def find_by_id(self, id: int) -> Optional[Any]:
"""Find record by ID (only active records).
Args:
id: Record ID
Returns:
Record if found and active, None otherwise
"""
query = f"""
SELECT * FROM {self.table_name}
WHERE id = ?
AND record_status = 'active'
"""
return self._query_one(query, id)
def find_all(self, limit: int = 100) -> List[Any]:
"""Find all active records.
Args:
limit: Maximum number of records to return
Returns:
List of active records
"""
query = f"""
SELECT * FROM {self.table_name}
WHERE record_status = 'active'
ORDER BY created_at DESC
LIMIT ?
"""
return self._query(query, limit)
def create(self, data: dict) -> int:
"""Create new record.
Args:
data: Record data as dictionary
Returns:
ID of created record
"""
columns = ", ".join(data.keys())
placeholders = ", ".join("?" * len(data))
query = f"""
INSERT INTO {self.table_name} ({columns}, record_status, created_at)
VALUES ({placeholders}, 'active', CURRENT_TIMESTAMP)
"""
cursor = self._execute(query, *data.values())
return cursor.lastrowid
def update(self, id: int, data: dict) -> None:
"""Update existing record.
Args:
id: Record ID
data: Fields to update
"""
set_clause = ", ".join(f"{k} = ?" for k in data.keys())
query = f"""
UPDATE {self.table_name}
SET {set_clause}, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
AND record_status = 'active'
"""
self._execute(query, *data.values(), id)
def delete(self, id: int) -> None:
"""Soft delete record (Core Principle #4: Data is Forever).
NEVER hard deletes. Sets record_status='deleted' instead.
Args:
id: Record ID to delete
"""
query = f"""
UPDATE {self.table_name}
SET record_status = 'deleted',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
"""
self._execute(query, id)
def _query(self, query: str, *params) -> List[Any]:
"""Execute query and return all results."""
cursor = self.connection.execute(query, params)
return cursor.fetchall()
def _query_one(self, query: str, *params) -> Optional[Any]:
"""Execute query and return single result."""
cursor = self.connection.execute(query, params)
return cursor.fetchone()
def _execute(self, query: str, *params) -> Any:
"""Execute query without returning results."""
return self.connection.execute(query, params)
Repository Pattern Benefits:
- Abstraction: Hide database implementation details
- Testability: Easy to mock for tests
- Consistency: All repositories use same patterns
- Soft Deletes: Enforced at base class level
- SQL Injection Prevention: Table name whitelist
Usage in EPGOAT:
- EventRepository: Manages events table
- ParticipantRepository: Manages participants table
- UnmatchedChannelRepository: Manages unmatched_channels table
All inherit from BaseRepository for consistent behavior.
💡 Real EPGOAT Class Example - EnhancedMatchCache
#!/usr/bin/env python3
"""Enhanced Match Cache (P2-012) - Layer 2.
Provides short-term match caching with dual lookup strategy (tvg-id OR channel_name)
for same-day re-processing optimization.
Key Features:
- Dual lookup strategy: Priority 1 (tvg-id), Priority 2 (channel_name)
- Cache expiration: 24-48h TTL
- Same-day re-processing optimization (6am → 12pm → 6pm)
- Metrics tracking for cache hit rate analysis
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Optional
@dataclass
class CachedMatch:
"""Cached match result with dual identifiers.
Attributes:
tvg_id: TVG-ID from M3U (if available)
channel_name: Channel display name
channel_family: Channel family/category (e.g., "NBA", "NFL")
channel_payload: Extracted payload (e.g., "Lakers vs Celtics")
matched_event_id: ID of matched event from API
league: League identifier (e.g., "NBA", "Premier League")
sport: Sport name (e.g., "Basketball", "Soccer")
confidence: Match confidence score (0.0-1.0)
cached_at: When this match was cached
"""
tvg_id: str | None
channel_name: str
channel_family: str
channel_payload: str
matched_event_id: int
league: str
sport: str
confidence: float
cached_at: datetime = field(default_factory=datetime.now)
class EnhancedMatchCache:
"""Enhanced match cache with dual lookup strategy.
Provides short-term caching (24-48h) for same-day re-processing optimization.
Uses dual lookup strategy: tvg-id (priority 1) OR channel_name (priority 2).
Benefits:
- 95%+ cache hit rate for same-day re-processing
- Reduces API calls for multiple EPG generations per day
- Supports providers with and without tvg-ids
- Automatic expiration prevents stale data
Example:
>>> cache = EnhancedMatchCache(expiration_hours=24)
>>>
>>> # Store match:
>>> cache.store_match(
... tvg_id="nba-lakers-123",
... channel_name="NBA 01: Lakers vs Celtics",
... channel_family="NBA",
... channel_payload="Lakers vs Celtics",
... matched_event_id=12345,
... league="NBA",
... sport="Basketball",
... confidence=0.95
... )
>>>
>>> # Find match (fast lookup):
>>> result = cache.find_match(tvg_id="nba-lakers-123")
>>> result.matched_event_id
12345
"""
def __init__(self, expiration_hours: int = 24) -> None:
"""Initialize cache with expiration time.
Args:
expiration_hours: How long to cache matches (default: 24 hours)
"""
self._cache: Dict[str, CachedMatch] = {}
self._expiration_hours = expiration_hours
self._hits = 0
self._misses = 0
def find_match(
self,
tvg_id: Optional[str] = None,
channel_name: Optional[str] = None
) -> Optional[CachedMatch]:
"""Find cached match by tvg-id or channel name.
Dual lookup strategy:
1. Try tvg-id first (faster, more reliable)
2. Fall back to channel_name (works without tvg-id)
Args:
tvg_id: TVG-ID to lookup (priority 1)
channel_name: Channel name to lookup (priority 2)
Returns:
CachedMatch if found and not expired, None otherwise
"""
# Try tvg-id first (priority 1):
if tvg_id:
match = self._cache.get(f"tvg:{tvg_id}")
if match and not self._is_expired(match):
self._hits += 1
return match
# Fall back to channel name (priority 2):
if channel_name:
match = self._cache.get(f"name:{channel_name}")
if match and not self._is_expired(match):
self._hits += 1
return match
self._misses += 1
return None
def store_match(
self,
tvg_id: Optional[str],
channel_name: str,
channel_family: str,
channel_payload: str,
matched_event_id: int,
league: str,
sport: str,
confidence: float
) -> None:
"""Store match in cache with dual keys.
Stores under both tvg-id and channel_name for dual lookup support.
Args:
tvg_id: TVG-ID from M3U (optional)
channel_name: Channel display name (required)
channel_family: Channel family (e.g., "NBA")
channel_payload: Extracted payload (e.g., "Lakers vs Celtics")
matched_event_id: ID of matched event
league: League identifier
sport: Sport name
confidence: Match confidence (0.0-1.0)
"""
match = CachedMatch(
tvg_id=tvg_id,
channel_name=channel_name,
channel_family=channel_family,
channel_payload=channel_payload,
matched_event_id=matched_event_id,
league=league,
sport=sport,
confidence=confidence
)
# Store under tvg-id if available:
if tvg_id:
self._cache[f"tvg:{tvg_id}"] = match
# Always store under channel name:
self._cache[f"name:{channel_name}"] = match
def _is_expired(self, match: CachedMatch) -> bool:
"""Check if cached match has expired.
Args:
match: Cached match to check
Returns:
True if expired, False otherwise
"""
age = datetime.now() - match.cached_at
return age > timedelta(hours=self._expiration_hours)
def get_hit_rate(self) -> float:
"""Calculate cache hit rate.
Returns:
Hit rate as percentage (0.0-1.0)
"""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
Key Patterns Demonstrated:
- Type Hints: 100% coverage (every parameter and return)
- Docstrings: Google style for all public methods
- Dataclasses: Clean data structures with defaults
- Private Methods: _is_expired() marked with underscore
- Dual Lookup: Flexible strategy (tvg-id OR channel_name)
- Metrics: Track hits/misses for performance analysis
- Expiration: Automatic cache invalidation
Real Performance: - 95%+ hit rate for same-day re-processing - O(1) lookup time (dict lookup) - Saves 100+ API calls per run (100-500ms each)
API Clients: infrastructure/clients/
| Client | Purpose | File |
|---|---|---|
| TheSportsDB | Primary event database | api_client.py |
| ESPN API | Fallback event data | espn_api_client.py |
| TV Schedule API | Alternate event source | tv_schedule_client.py |
| Tracked Clients | API call tracking wrapper | tracked_api_clients.py |
Parsers: infrastructure/parsers/
| Parser | Purpose | File |
|---|---|---|
| Provider M3U Parser | Parse M3U with provider-specific logic | provider_m3u_parser.py |
| VOD Detector | Filter VOD channels (91.7% reduction) | vod_detector.py |
| Channel Parser | Extract channel metadata | channel_parser.py |
| Event Matcher | Match channels to events (deprecated) | event_matcher.py |
Application Layer: application/
Purpose: High-level workflows, orchestration
| File | Purpose |
|---|---|
epg_generator.py |
Main EPG generation pipeline |
CLI Layer: cli/
Purpose: Command-line interfaces
| Script | Purpose | Command |
|---|---|---|
run_provider.py |
Generate EPG for one provider | python cli/run_provider.py --provider tps |
onboard_provider.py |
Onboard new provider (pattern discovery) | python cli/onboard_provider.py --provider necro |
provider_runner/ |
Provider execution framework | (used by run_provider.py) |
Utilities: utilities/
Purpose: Helper scripts for manual operations
| Script | Purpose |
|---|---|
refresh_event_db_v2.py |
Refresh events from TheSportsDB |
refresh_leagues.py |
Refresh league data |
analyze_tps_m3u.py |
Analyze M3U playlist |
migrate_database.py |
Run database migrations |
Run from: backend/epgoat/ directory
Configuration: backend/config/
Purpose: YAML configuration files
Provider Configs: config/providers/
providers/
└── tps.yml # TPS provider config (72 patterns, VOD filters, TVG-IDs)
# Auto-generated from database by provider_config_manager.py
# 24-hour cache TTL
📊 Provider Configuration Loading Flow
flowchart TD
Request[Load Provider Config] --> Cache{YAML Cache Exists?}
Cache -->|Yes| Age{Cache < 24h old?}
Age -->|Yes| LoadYAML[Load from YAML<br/>⚡ 53x faster]
Age -->|No| FetchDB
Cache -->|No| FetchDB[Fetch from Database<br/>🐢 Slower]
FetchDB --> BuildYAML[Build YAML Structure]
BuildYAML --> WriteCache[Write YAML Cache]
WriteCache --> LoadYAML
LoadYAML --> Config[ProviderConfig Object]
style LoadYAML fill:#90EE90
style FetchDB fill:#FFD700
style Config fill:#87CEEB
*Hybrid Configuration Strategy:
- Database: Source of truth (patterns, VOD filters, TVG-IDs)
- YAML Cache: Performance optimization (53x faster loading)
- 24-hour TTL: Balance freshness vs performance
File Location: backend/config/providers/<provider_slug>.yml
Example: TPS provider config has 72 patterns, VOD filters, TVG-ID mappings
Benefits: - Database changes auto-sync to YAML within 24 hours - Fast loads during development (YAML) - Single source of truth (database) *
Global Configs
| File | Purpose | Count |
|---|---|---|
sport_emojis.yml |
Sport → emoji mappings | 89 sports |
sport_categories.yml |
Sport → XMLTV category mappings | 77 sports |
channel_patterns.yml |
Global channel regex patterns | ~50 patterns |
matching_config.yml |
Matching pipeline configuration | N/A |
api_config.yml |
API client configuration | N/A |
family_mappings/universal.yml |
Universal family-league mappings | N/A |
family_mappings/tps.yml |
TPS-specific family-league mappings | N/A |
Tests: tests/
Purpose: Test suite (pytest)
tests/
├── test_patterns.py # Pattern matching tests
├── test_parsers.py # M3U parsing tests
├── test_schedulers.py # Programme scheduling tests
├── test_config.py # Configuration loading tests
├── test_schemas.py # Schema validation tests
├── test_integration.py # End-to-end workflow tests
├── conftest.py # Pytest fixtures
└── fixtures/ # Test data
├── sample.m3u # Sample playlist
└── 2025-11-01-channel-names-1386.csv # Real TPS channels
Run: make test (from project root) or pytest (from backend/epgoat/)
Coverage: 98.2% (14 failures due to cache bugs, not restructure issues)
📊 Test Suite Organization
graph TD
Tests[tests/] --> Unit[Unit Tests<br/>~700 tests]
Tests --> Integration[Integration Tests<br/>~50 tests]
Tests --> E2E[End-to-End Tests<br/>~10 tests]
Unit --> UnitFiles[test_patterns.py<br/>test_parsers.py<br/>test_models.py<br/>test_schemas.py]
Integration --> IntFiles[test_enrichment.py<br/>test_services.py<br/>test_caching.py]
E2E --> E2EFiles[test_integration.py<br/>test_epg_generation.py]
Tests --> Fixtures[fixtures/<br/>conftest.py]
Fixtures --> FixFiles[sample.m3u<br/>sample_events.json<br/>test_providers.yml]
style Unit fill:#90EE90
style Integration fill:#FFD700
style E2E fill:#FFB6C1
style Fixtures fill:#87CEEB
*Test Pyramid Distribution:
- Unit Tests (90%): Fast, isolated, focused
- Test individual functions/classes
- Mock external dependencies
-
Run in milliseconds
-
Integration Tests (7%): Medium speed, multi-component
- Test component interactions
- Use real services (with mocked externals)
-
Run in seconds
-
End-to-End Tests (3%): Slow, comprehensive
- Test complete workflows
- Minimal mocking
- Run in 10-30 seconds each
Run Commands:
- pytest tests/test_patterns.py (specific file)
- pytest tests/test_patterns.py::test_nba_pattern (specific test)
- pytest -k "match" (tests matching "match")
- pytest -v (verbose output)
*
Common File Locations
When working on...
| Task | Look Here |
|---|---|
| Adding new sport | config/sport_emojis.yml, config/sport_categories.yml |
| Adding channel pattern | config/channel_patterns.yml or provider config |
| Modifying match logic | services/enrichment/handlers/ |
| Adding new handler | services/enrichment/handlers/ + factory.py |
| Database queries | infrastructure/database/repositories/ |
| Schema changes | infrastructure/database/migrations/ (new numbered file) |
| API integration | infrastructure/clients/ |
| M3U parsing | infrastructure/parsers/provider_m3u_parser.py |
| XMLTV generation | domain/xmltv.py |
| Team name resolution | services/team_name_resolution_service.py |
| Provider onboarding | services/provider_onboarding_service.py |
| Configuration loading | services/provider_config_manager.py |
💡 Finding Files - Common Tasks
# ============================================================
# Task: Add new sport emoji
# ============================================================
# File: backend/config/sport_emojis.yml
# Add: Cricket: 🏏
# ============================================================
# Task: Add channel pattern for new league
# ============================================================
# Option 1: Provider-specific (preferred)
# File: backend/config/providers/tps.yml
# Add to patterns section:
# - pattern: '^IPL\s+\d+\s*:?'
# sport_family: 'IPL'
# priority: 100
# Option 2: Global patterns
# File: backend/config/channel_patterns.yml
# Add to patterns list
# ============================================================
# Task: Modify match logic
# ============================================================
# Files: backend/epgoat/services/enrichment/handlers/
# - regex_handler.py (pattern matching)
# - api_handler.py (API calls)
# - local_database_handler.py (DB lookups)
# ============================================================
# Task: Add new database table
# ============================================================
# Step 1: Create migration
cd backend/epgoat/infrastructure/database/migrations
# Create: 018_add_new_table.sql
# Step 2: Run migration
cd ../
python migration_runner.py
# Step 3: Create repository
cd repositories/
# Create: new_table_repository.py (inherit from BaseRepository)
# ============================================================
# Task: Add tests for new feature
# ============================================================
# Unit tests: backend/epgoat/tests/test_<module_name>.py
# Integration tests: backend/epgoat/tests/test_integration.py
# ============================================================
# Task: Debug EPG generation issue
# ============================================================
# Entry point: backend/epgoat/cli/run_provider.py
# Pipeline: backend/epgoat/application/epg_generator.py
# Handlers: backend/epgoat/services/enrichment/handlers/
# Output: backend/epgoat/output/<provider>_<date>.xml
# ============================================================
# Task: Update provider configuration
# ============================================================
# Database: Update provider_patterns table (source of truth)
# YAML cache: backend/config/providers/<provider>.yml (auto-generated)
# Manager: backend/epgoat/services/provider_config_manager.py
# ============================================================
# Task: Check current database schema
# ============================================================
# Current schema: backend/epgoat/infrastructure/database/migrations/017_*.sql
# Base repository: backend/epgoat/infrastructure/database/repositories/base_repository.py
# All repositories: backend/epgoat/infrastructure/database/repositories/
File Organization Principles:
- Configuration:
backend/config/for YAML files - Business Logic:
backend/epgoat/services/for services - Data Models:
backend/epgoat/domain/for domain models - Database:
backend/epgoat/infrastructure/database/ - CLI Tools:
backend/epgoat/cli/for command-line scripts - Tests:
backend/epgoat/tests/for test suite
Quick Find Commands:
# Find by filename:
find backend/epgoat -name "event_repository.py"
# Find by content:
grep -r "EnhancedMatchCache" backend/epgoat/
# Find by pattern:
find backend/epgoat -name "*cache*.py"
Import Patterns
Example imports (follow these patterns):
# Domain layer (no external deps)
from backend.epgoat.domain.models import Event, Channel
from backend.epgoat.domain.schemas import EventSchema
from backend.epgoat.domain.patterns import ALLOWED_CHANNEL_PATTERNS
# Services layer
from backend.epgoat.services.enhanced_match_cache import EnhancedMatchCache
from backend.epgoat.services.enrichment.pipeline import EnrichmentPipeline
from backend.epgoat.services.provider_config_manager import ProviderConfigManager
# Infrastructure layer
from backend.epgoat.infrastructure.database.connection import get_database_connection
from backend.epgoat.infrastructure.clients.api_client import TheSportsDBClient
from backend.epgoat.infrastructure.parsers.provider_m3u_parser import ProviderM3UParser
# Application layer
from backend.epgoat.application.epg_generator import generate_epg
💡 Import Patterns - By Layer
# ============================================================
# Domain Layer Imports (NO external dependencies)
# ============================================================
from backend.epgoat.domain.models import (
Event,
Channel,
M3UEntry,
Participant,
create_channel
)
from backend.epgoat.domain.schemas import (
EventSchema,
ChannelSchema,
ProviderConfigSchema
)
from backend.epgoat.domain.patterns import ALLOWED_CHANNEL_PATTERNS
from backend.epgoat.domain.xmltv import generate_xmltv
# ============================================================
# Services Layer Imports
# ============================================================
from backend.epgoat.services.enhanced_match_cache import EnhancedMatchCache
from backend.epgoat.services.cross_provider_cache import CrossProviderCache
from backend.epgoat.services.enrichment.pipeline import EnrichmentPipeline
from backend.epgoat.services.enrichment.factory import HandlerFactory
from backend.epgoat.services.provider_config_manager import ProviderConfigManager
from backend.epgoat.services.team_name_resolution_service import TeamNameResolutionService
# ============================================================
# Infrastructure Layer Imports
# ============================================================
from backend.epgoat.infrastructure.database.connection import get_database_connection
from backend.epgoat.infrastructure.database.repositories.event_repository import EventRepository
from backend.epgoat.infrastructure.clients.api_client import TheSportsDBClient
from backend.epgoat.infrastructure.parsers.provider_m3u_parser import ProviderM3UParser
# ============================================================
# Application Layer Imports
# ============================================================
from backend.epgoat.application.epg_generator import generate_epg
# ============================================================
# Usage Example - Dependency Injection
# ============================================================
def create_epg_pipeline(provider_slug: str) -> EnrichmentPipeline:
"""Create EPG pipeline with all dependencies (DDD pattern).
Demonstrates proper dependency injection and layer separation.
"""
# Infrastructure layer (external dependencies):
db_connection = get_database_connection()
api_client = TheSportsDBClient(api_key=get_api_key())
event_repo = EventRepository(connection=db_connection)
# Services layer (business logic):
config_manager = ProviderConfigManager(connection=db_connection)
match_cache = EnhancedMatchCache(expiration_hours=24)
cross_cache = CrossProviderCache(expiration_hours=48)
# Create handler factory:
factory = HandlerFactory(
event_repo=event_repo,
api_client=api_client,
match_cache=match_cache,
cross_cache=cross_cache
)
# Application layer (orchestration):
pipeline = EnrichmentPipeline(
handlers=factory.create_handlers(),
config_manager=config_manager
)
return pipeline
Import Guidelines:
- Absolute Imports: Always use
from backend.epgoat.X import Y - Never use relative imports (
from ..models import Event) -
Works from any directory
-
Layer Separation: Import from same layer or lower layers only
- ✅ Services → Domain (ok)
- ✅ Application → Services (ok)
- ❌ Domain → Services (VIOLATION)
-
❌ Services → Application (VIOLATION)
-
Dependency Injection: Pass dependencies via constructor
- Don't import concrete classes in domain layer
-
Use protocols for abstractions
-
Type Hints: Import types for annotations
from typing import List, Dict, Optional- Use for function signatures
Key Design Patterns
- Domain-Driven Design: Clear separation of domain, application, services, infrastructure
- Chain of Responsibility: Enrichment pipeline (7 handlers)
- Factory Pattern: Handler factory for dependency injection
- Repository Pattern: Data access abstraction
- Observer Pattern: Cost tracking, performance monitoring
- Strategy Pattern: Multiple matching strategies (regex, API, LLM)
- Service Layer: Business logic separate from infrastructure
Performance Hot Paths
Critical for speed:
1. VOD filtering: infrastructure/parsers/vod_detector.py (91.7% reduction)
2. Provider config loading: services/provider_config_manager.py (53x faster with YAML cache)
3. Enhanced match cache: services/enhanced_match_cache.py (O(1) lookup)
4. Event deduplication: services/event_deduplication.py (92% API call reduction)
5. Regex matcher: services/regex_matcher.py (multi-stage: exact → fuzzy)
For Deep Dives: Use IDE "Go to Definition" or grep -r "class_name" backend/epgoat/
For Architecture: See Documentation/03-Architecture/System-Overview.md