Codebase Navigation & Structure

EPGOAT Documentation - AI Reference (Educational)

Codebase Navigation & Structure (Educational Version)

Note: This is the educational, human-readable version with examples and detailed explanations. For the AI-optimized version, see 1-CONTEXT/_CODEBASE.md.


Codebase Map (AI-Optimized)

Purpose: Quick navigation reference for Claude Code Token Budget: ~2K tokens (part of 50K Layer 1 budget) Last Updated: 2025-11-13 (auto-generated)


Root Structure

epgoat-internal/
├── backend/
│   ├── epgoat/          # Python EPG engine (CORE)
│   └── config/          # YAML configuration files
├── frontend/            # Web apps (NOT YET BUILT)
├── .github/workflows/   # CI/CD (NOT YET BUILT)
├── Documentation/       # 151 files (UNDER RENOVATION)
├── CLAUDE.md            # AI assistant instructions
├── Makefile             # Build commands
├── requirements.txt     # Python dependencies
└── pytest.ini           # Test configuration

Backend: backend/epgoat/ (CORE)

Purpose: Python 3.11 EPG generation engine Total Files: ~150+ Python files Test Coverage: 98.2% (770/784 tests passing)

Layer Architecture (Domain-Driven Design)

backend/epgoat/
├── domain/              # Core business logic (models, schemas, patterns)
├── application/         # High-level workflows (epg_generator.py)
├── services/            # 30+ business services (matching, caching, enrichment)
├── infrastructure/      # External dependencies (database, API clients, parsers)
├── cli/                 # Command-line interfaces
├── data/                # Data access layer (deprecated, migrating to services)
├── tests/               # Test suite (784 tests)
└── utilities/           # Helper scripts (refresh DB, migrations)

📊 Domain-Driven Design Layer Architecture

graph TB
    CLI[CLI Layer<br/>cli/] --> App[Application Layer<br/>application/]
    App --> Services[Services Layer<br/>services/]
    Services --> Domain[Domain Layer<br/>domain/]
    Services --> Infra[Infrastructure Layer<br/>infrastructure/]

    Domain --> Models[models.py<br/>schemas.py<br/>patterns.py]
    Services --> Enrich[enrichment/<br/>30+ services]
    Infra --> DB[database/<br/>clients/<br/>parsers/]

    CLI -.->|imports| App
    App -.->|imports| Services
    Services -.->|imports| Domain
    Services -.->|imports| Infra
    Domain -.->|no deps| None[❌ No External Deps]

    style Domain fill:#90EE90
    style Services fill:#FFD700
    style App fill:#87CEEB
    style CLI fill:#FFB6C1
    style Infra fill:#DDA0DD
    style None fill:#FF6B6B

*Dependency Flow: Top layers depend on bottom layers, never reverse.

Domain Layer (green): Pure business logic, no external dependencies. - Models: Event, Channel, Participant - Schemas: Pydantic validation - Patterns: Regex patterns for matching

Services Layer (yellow): Business logic orchestration. - Enrichment pipeline (7 handlers) - Caching services - Team name resolution - Provider config management

Application Layer (blue): High-level workflows. - EPG generation pipeline - Orchestration logic

CLI Layer (pink): Command-line interfaces. - run_provider.py - onboard_provider.py

Infrastructure Layer (purple): External dependencies. - Database repositories - API clients (TheSportsDB, ESPN) - M3U parsers

Key Rule: Domain layer has ZERO external dependencies. Can be tested without database, API, or file system. *

📊 Backend Package Structure (backend/epgoat/)

graph TD
    Root[backend/epgoat/] --> Domain[domain/]
    Root --> Services[services/]
    Root --> App[application/]
    Root --> Infra[infrastructure/]
    Root --> CLI[cli/]
    Root --> Tests[tests/]
    Root --> Utils[utilities/]

    Domain --> DomainFiles[models.py<br/>schemas.py<br/>patterns.py<br/>parsers.py<br/>xmltv.py<br/>config.py]

    Services --> ServicesFiles[30+ service files<br/>enrichment/<br/>caching/<br/>matching/]

    Infra --> InfraFiles[database/<br/>clients/<br/>parsers/]

    CLI --> CLIFiles[run_provider.py<br/>onboard_provider.py<br/>provider_runner/]

    Tests --> TestFiles[test_patterns.py<br/>test_parsers.py<br/>test_integration.py<br/>fixtures/]

    style Root fill:#FFD700
    style Domain fill:#90EE90
    style Services fill:#87CEEB
    style Infra fill:#DDA0DD
    style CLI fill:#FFB6C1

*Navigation Guide:

  • domain/: Start here to understand core business logic
  • services/: Look here for matching, caching, enrichment
  • infrastructure/: External integrations (DB, APIs)
  • application/: High-level workflows (EPG generation)
  • cli/: Entry points for command-line tools
  • tests/: Unit, integration, and E2E tests
  • utilities/: Helper scripts (DB refresh, migrations) *

Domain Layer: domain/

Purpose: Core business logic, no external dependencies

File Purpose Lines
models.py Core data models (Event, Channel, Participant, etc.) ~400
schemas.py Pydantic validation schemas ~300
patterns.py Channel name regex patterns ~200
parsers.py Channel parsing logic (teams, times) ~400
xmltv.py XMLTV XML generation ~300
config.py Configuration loading ~150
provider_config.py Provider-specific configuration models ~200
datetime_utils.py Date/time parsing utilities ~100

Key Models: - Event: Sports event (teams, league, date/time, venue) - Channel: IPTV channel (name, tvg-id, URL) - EnrichmentContext: Data passed through matching pipeline - ProviderConfig: Provider patterns, VOD filters, TVG-ID mappings

📖 Domain Layer Philosophy - Pure Business Logic

The Domain Layer is the heart of EPGOAT's architecture. It contains pure business logic with ZERO external dependencies.

Why Pure Domain Logic?

  1. Testability: No mocks needed for domain tests
  2. Test Event model without database
  3. Test Channel parsing without M3U files
  4. Test XMLTV generation without file system
  5. Fast tests (milliseconds, not seconds)

  6. Portability: Domain logic works anywhere

  7. Switch from Supabase to PostgreSQL? Domain unchanged.
  8. Switch from TheSportsDB to ESPN? Domain unchanged.
  9. Switch from Cloudflare to AWS? Domain unchanged.

  10. Understanding: Easy to learn system

  11. Start with domain models (Event, Channel, Participant)
  12. No infrastructure noise (APIs, databases, file systems)
  13. Pure Python (no external libraries)

  14. Longevity: Domain outlives infrastructure

  15. APIs change, databases change, frameworks change
  16. Business logic (matching, scheduling, XMLTV) is stable
  17. Domain layer has longest lifespan

EPGOAT Domain Files:

  • models.py: Core data structures (Event, Channel, M3UEntry)
  • schemas.py: Pydantic validation (EventSchema, ChannelSchema)
  • patterns.py: Channel regex patterns (ALLOWED_CHANNEL_PATTERNS)
  • parsers.py: Channel parsing logic (extract teams, times)
  • xmltv.py: XMLTV XML generation
  • config.py: Configuration loading
  • datetime_utils.py: Date/time parsing utilities

What's NOT in Domain Layer:

  • ❌ Database queries
  • ❌ API calls
  • ❌ File I/O
  • ❌ HTTP requests
  • ❌ Third-party libraries (except standard library)

Dependency Rule: Domain layer depends on NOTHING external. All other layers depend on domain layer.

Real Example:

# Domain layer (pure logic):
@dataclass
class Event:
    """Sports event (pure data, no dependencies)."""
    id: int
    name: str
    date: str
    league: str
    home_team: str
    away_team: str

# Services layer (uses domain):
class EventRepository:
    """Manages Event persistence (infrastructure dependency)."""
    def find_by_id(self, id: int) -> Event:
        # Database query (infrastructure)
        row = db.query("SELECT * FROM events WHERE id = ?", id)
        # Return domain model (pure)
        return Event(**row)

Benefits in Practice:

  • Domain tests run in 0.1s (no database setup)
  • Domain models used in CLI, web API, batch jobs (portable)
  • Business logic clear and focused
  • Infrastructure changes don't break domain

Services Layer: services/ (30+ Services)

Purpose: Business logic, orchestration, caching

📖 Services Layer - Business Logic Orchestration

The Services Layer orchestrates business logic by coordinating domain models and infrastructure components.

Services vs Domain:

Domain Layer Services Layer
Pure data structures Business logic orchestration
No external dependencies Uses infrastructure
Event, Channel, M3UEntry EventService, MatchingService
Simple operations Complex workflows

Service Categories in EPGOAT:

1. Enrichment Services (Core matching logic) - EnrichmentPipeline: Orchestrates 7-handler chain - EnhancedMatchCache: Short-term match caching - CrossProviderCache: Learn from other providers - RegexMatcher: Multi-stage pattern matching - TeamNameResolutionService: LLM-based team name resolution

2. Provider Services (Provider management) - ProviderConfigManager: Load provider configs (YAML + DB) - ProviderOnboardingService: Auto-discover channel patterns - ProviderOrchestrator: Parallel provider processing

3. Data Services (Event/participant management) - EventDeduplicationService: Remove duplicate events (92% reduction) - ParticipantService: Manage teams/players - LeagueInferenceService: Infer leagues from channel families

4. Utility Services (Helper functionality) - CostTracker: Track LLM/API costs - LogoGenerator: Generate sport logos - R2StorageService: Upload to Cloudflare R2 - MatchLearner: Self-learning from successful matches

Service Pattern:

class EventService:
    """Event management service (orchestrates domain + infrastructure)."""

    def __init__(
        self,
        repo: EventRepository,  # Infrastructure
        api: TheSportsDBClient   # Infrastructure
    ) -> None:
        self.repo = repo
        self.api = api

    def get_or_fetch_event(self, event_id: int) -> Event:
        """Get event from DB or fetch from API (orchestration)."""
        # Try repository first (cache):
        event = self.repo.find_by_id(event_id)
        if event:
            return event

        # Fall back to API:
        data = self.api.get_event(event_id)

        # Convert to domain model:
        event = Event(**data)

        # Save to repository:
        self.repo.create(event)

        return event

Why Services Layer?

  1. Reusability: Service methods used by CLI, API, batch jobs
  2. Testing: Mock infrastructure, test business logic
  3. Separation: Domain stays pure, infrastructure isolated
  4. Complexity: Hide infrastructure complexity from application layer

EPGOAT Example - Enrichment Pipeline:

The enrichment pipeline is a perfect service layer component: - Uses domain models (Channel, Event) - Orchestrates infrastructure (database, API, cache) - Implements business logic (matching strategy) - Testable (mock handlers) - Reusable (CLI, API, batch jobs)

Enrichment Pipeline (Core Matching Logic)

services/enrichment/
├── pipeline.py                  # Orchestrates 7-handler chain
├── context.py                   # EnrichmentContext (data container)
├── factory.py                   # Handler factory (dependency injection)
├── handlers/                    # 7 matching handlers
│   ├── enhanced_match_cache_handler.py      # Stage 1: Cache lookup
│   ├── event_details_cache_handler.py       # Stage 2: Event details
│   ├── local_database_handler.py            # Stage 3: Local events DB
│   ├── regex_handler.py                     # Stage 4: Regex matching
│   ├── cross_provider_cache_handler.py      # Stage 5: Cross-provider
│   ├── api_handler.py                       # Stage 6: API calls
│   └── fallback_handler.py                  # Stage 7: LLM fallback
├── services/                    # Preprocessing services
│   ├── team_parsing_service.py              # Extract team names
│   ├── sport_detection_service.py           # Detect sport type
│   ├── league_inference_service.py          # Infer league from family
│   └── time_extraction_service.py           # Parse date/time
├── observers/                   # Cross-cutting concerns
│   └── cost_tracking_observer.py            # Track API costs
└── tests/                       # 20+ test files

📊 Enrichment Pipeline - 7 Handler Chain

flowchart TD
    Start[Channel Input] --> H1[Handler 1:<br/>Enhanced Match Cache]

    H1 -->|Cache HIT| Done[Return Match]
    H1 -->|Cache MISS| H2[Handler 2:<br/>Event Details Cache]

    H2 -->|Cache HIT| Done
    H2 -->|Cache MISS| H3[Handler 3:<br/>Local Database Lookup]

    H3 -->|DB HIT| Done
    H3 -->|DB MISS| H4[Handler 4:<br/>Regex Matcher]

    H4 -->|Match FOUND| Done
    H4 -->|No Match| H5[Handler 5:<br/>Cross-Provider Cache]

    H5 -->|Cache HIT| Done
    H5 -->|Cache MISS| H6[Handler 6:<br/>API Handler]

    H6 -->|API HIT| Done
    H6 -->|API MISS| H7[Handler 7:<br/>LLM Fallback]

    H7 -->|LLM Match| Done
    H7 -->|No Match| NoMatch[No Match Found]

    style H1 fill:#90EE90
    style H2 fill:#90EE90
    style H3 fill:#FFD700
    style H4 fill:#87CEEB
    style H5 fill:#FFB6C1
    style H6 fill:#DDA0DD
    style H7 fill:#FF6B6B
    style Done fill:#32CD32
    style NoMatch fill:#DC143C

*Chain of Responsibility Pattern: Each handler tries to match the channel. If it can't, it passes to the next handler.

Handler Priorities (fastest → slowest):

  1. Enhanced Match Cache (O(1) lookup, 0ms)
  2. Same-day re-processing optimization
  3. 95%+ hit rate for multiple runs per day

  4. Event Details Cache (O(1) lookup, 0ms)

  5. Cached event details from previous lookups

  6. Local Database (SQL query, 10-50ms)

  7. Query events table for date/league match

  8. Regex Matcher (Pattern matching, 1-5ms)

  9. Multi-stage: exact → fuzzy → team extraction

  10. Cross-Provider Cache (O(1) lookup, 0ms)

  11. Learn from other providers (shared matches)

  12. API Handler (HTTP request, 100-500ms)

  13. TheSportsDB API call (costs money + time)

  14. LLM Fallback (Claude API, 1000-3000ms)

  15. Last resort for complex matches (costs $$$ + time)

Performance: 90% of channels matched in handlers 1-4 (< 50ms each) *

📖 Chain of Responsibility Pattern - Enrichment Pipeline

The Enrichment Pipeline uses the Chain of Responsibility pattern to implement a flexible, extensible matching strategy.

Pattern Overview:

Chain of Responsibility passes a request through a chain of handlers. Each handler decides whether to process the request or pass it to the next handler.

Benefits:

  1. Flexibility: Add/remove handlers without changing pipeline
  2. Priority: Handlers run in priority order (cache → DB → API)
  3. Early Exit: Stop when match found (no wasted processing)
  4. Testability: Test each handler independently
  5. Monitoring: Track which handler found match

EPGOAT Implementation:

class EnrichmentHandler(Protocol):
    """Protocol for enrichment handlers (Open/Closed Principle)."""

    def handle(self, context: EnrichmentContext) -> EnrichmentContext:
        """Process enrichment context.

        Args:
            context: Current enrichment context

        Returns:
            Updated context (with match if found)
        """
        ...

class EnrichmentPipeline:
    """Orchestrates chain of handlers."""

    def __init__(self, handlers: List[EnrichmentHandler]) -> None:
        self.handlers = handlers

    def enrich(self, channel: Channel) -> EnrichmentContext:
        """Run channel through handler chain."""
        context = EnrichmentContext(channel=channel)

        for handler in self.handlers:
            context = handler.handle(context)

            # Early exit if match found:
            if context.matched_event:
                context.matched_by = handler.__class__.__name__
                break

        return context

Handler Chain (priority order):

  1. EnhancedMatchCacheHandler: O(1) cache lookup (0ms)
  2. EventDetailsCacheHandler: Cached event details (0ms)
  3. LocalDatabaseHandler: SQL query (10-50ms)
  4. RegexHandler: Pattern matching (1-5ms)
  5. CrossProviderCacheHandler: Shared cache (0ms)
  6. APIHandler: HTTP request (100-500ms)
  7. FallbackHandler: LLM call (1000-3000ms)

Performance Optimization:

  • 90% of channels matched in handlers 1-4 (< 50ms)
  • Expensive handlers (6-7) rarely used
  • Each handler adds minimal overhead if not matched

Adding New Handler:

# 1. Implement EnrichmentHandler protocol:
class NewHandler:
    def handle(self, context: EnrichmentContext) -> EnrichmentContext:
        if context.matched_event:
            return context  # Already matched, skip

        # Your matching logic here:
        match = your_matching_logic(context.channel)

        if match:
            context.matched_event = match

        return context

# 2. Add to factory:
class HandlerFactory:
    def create_handlers(self) -> List[EnrichmentHandler]:
        return [
            EnhancedMatchCacheHandler(),
            NewHandler(),  # Insert in priority order
            LocalDatabaseHandler(),
            # ...
        ]

# 3. Done! No changes to pipeline code (Open/Closed Principle)

Monitoring Example:

Channel: "NBA 01: Lakers vs Celtics"
├─ EnhancedMatchCacheHandler: MISS (0ms)
├─ EventDetailsCacheHandler: MISS (0ms)
├─ LocalDatabaseHandler: HIT (25ms) ✅
└─ Matched by: LocalDatabaseHandler

Channel: "NFL 05: Patriots vs Cowboys"
├─ EnhancedMatchCacheHandler: HIT (0ms) ✅
└─ Matched by: EnhancedMatchCacheHandler

Key Takeaway: Chain of Responsibility makes EPGOAT's matching strategy flexible, fast, and easy to extend.

📊 EPG Generation Data Flow

flowchart LR
    M3U[M3U Playlist] --> Parser[M3U Parser]
    Parser --> VOD[VOD Filter<br/>91.7% reduction]
    VOD --> Channels[Channel List]

    Channels --> Enrich[Enrichment Pipeline<br/>7 handlers]

    DB[(Events DB)] --> Enrich
    API[TheSportsDB API] --> Enrich
    Cache[Match Cache] --> Enrich

    Enrich --> Matches[Matched Events]

    Matches --> Schedule[Schedule Builder<br/>Pre/Live/Post blocks]
    Schedule --> XMLTV[XMLTV Generator]
    XMLTV --> Output[EPG XML File]

    Matches --> SaveDB[Save to Database]

    style M3U fill:#E8F5E9
    style Output fill:#E8F5E9
    style DB fill:#FFF3E0
    style API fill:#E1F5FE
    style Cache fill:#F3E5F5

*Complete EPG Generation Flow:

  1. Input: M3U playlist from provider URL
  2. Parse: Extract channel metadata (name, tvg-id, group)
  3. Filter: Remove VOD channels (91.7% reduction)
  4. Enrich: Match channels to events (7-handler pipeline)
  5. Schedule: Generate programme blocks (pre/live/post)
  6. Output: XMLTV XML file for IPTV players
  7. Save: Store matches in database for caching

Performance: 100-500 channels processed in 30-120 seconds *

Key Services

Service Purpose File
Provider Config Manager Load provider configs (YAML cache + DB) provider_config_manager.py
Provider Onboarding Auto-discover channel patterns provider_onboarding_service.py
Provider Orchestrator Parallel provider processing provider_orchestrator.py
Team Name Resolution LLM-based canonical team names team_name_resolution_service.py
Enhanced League Inference Map families to leagues enhanced_league_inference.py
Enhanced Match Cache High-performance match cache enhanced_match_cache.py
Cross-Provider Cache Learn from other providers cross_provider_cache.py
Event Deduplication Remove duplicate events event_deduplication.py
Regex Matcher Multi-stage pattern matching regex_matcher.py
API Enrichment External API calls (deprecated, replaced by enrichment/) api_enrichment.py
Cost Tracker Track LLM/API costs cost_tracker.py
Logo Generator Generate sport logos logo_generator.py
R2 Storage Cloudflare R2 uploads core/r2_storage.py
GitHub Issue Service Auto-create validation issues integrations/github_issue_service.py
Match Learner Self-learning from successful matches match_learner.py
Schedulers Generate EPG programme blocks schedulers.py

Infrastructure Layer: infrastructure/

Purpose: External dependencies (databases, APIs, parsers)

Database: infrastructure/database/

database/
├── connection.py               # Supabase PostgreSQL connection
├── migration_runner.py         # Run SQL migrations
├── migrate.py                  # Migration CLI
├── migrations/                 # 17 numbered SQL migrations
│   ├── 001_initial_schema.sql
│   ├── 017_simplify_team_discovery.sql  # Latest (2025-11-07)
│   └── ...
├── repositories/               # Data access patterns
│   ├── event_repository.py             # Event CRUD
│   ├── participant_repository.py       # Participant CRUD
│   ├── unmatched_channel_repository.py # Unmatched channels
│   ├── supabase_event_repository.py    # Supabase-specific implementation
│   └── base_repository.py              # Base class with soft deletes
└── postgrest_cache.py         # PostgREST caching layer

💡 Repository Pattern - BaseRepository Example

"""Base Repository class with common CRUD operations.

All repository classes should inherit from this to get standard database operations.
Implements soft delete pattern (Core Principle #4: Data is Forever).
"""

from typing import Any, List, Optional


# Whitelist of allowed table names (SQL injection prevention)
ALLOWED_TABLES = {
    "events",
    "participants",
    "unmatched_channels",
    "match_cache",
    "providers",
}


class BaseRepository:
    """Base repository with soft delete support.

    Provides standard CRUD operations for all repositories.
    Enforces soft delete pattern (never hard delete).

    Example:
        >>> class EventRepository(BaseRepository):
        ...     def __init__(self, connection):
        ...         super().__init__(connection, "events")
        ...
        >>> repo = EventRepository(db_connection)
        >>> event = repo.find_by_id(123)
        >>> repo.delete(123)  # Soft delete (sets record_status='deleted')
    """

    def __init__(self, connection: Any, table_name: str) -> None:
        """Initialize repository with database connection.

        Args:
            connection: Database connection object
            table_name: Name of database table

        Raises:
            ValueError: If table_name not in ALLOWED_TABLES (SQL injection prevention)
        """
        if table_name not in ALLOWED_TABLES:
            raise ValueError(
                f"Table '{table_name}' not in ALLOWED_TABLES. "
                f"Add to whitelist if this is a valid table."
            )

        self.connection = connection
        self.table_name = table_name

    def find_by_id(self, id: int) -> Optional[Any]:
        """Find record by ID (only active records).

        Args:
            id: Record ID

        Returns:
            Record if found and active, None otherwise
        """
        query = f"""
            SELECT * FROM {self.table_name}
            WHERE id = ?
              AND record_status = 'active'
        """
        return self._query_one(query, id)

    def find_all(self, limit: int = 100) -> List[Any]:
        """Find all active records.

        Args:
            limit: Maximum number of records to return

        Returns:
            List of active records
        """
        query = f"""
            SELECT * FROM {self.table_name}
            WHERE record_status = 'active'
            ORDER BY created_at DESC
            LIMIT ?
        """
        return self._query(query, limit)

    def create(self, data: dict) -> int:
        """Create new record.

        Args:
            data: Record data as dictionary

        Returns:
            ID of created record
        """
        columns = ", ".join(data.keys())
        placeholders = ", ".join("?" * len(data))
        query = f"""
            INSERT INTO {self.table_name} ({columns}, record_status, created_at)
            VALUES ({placeholders}, 'active', CURRENT_TIMESTAMP)
        """
        cursor = self._execute(query, *data.values())
        return cursor.lastrowid

    def update(self, id: int, data: dict) -> None:
        """Update existing record.

        Args:
            id: Record ID
            data: Fields to update
        """
        set_clause = ", ".join(f"{k} = ?" for k in data.keys())
        query = f"""
            UPDATE {self.table_name}
            SET {set_clause}, updated_at = CURRENT_TIMESTAMP
            WHERE id = ?
              AND record_status = 'active'
        """
        self._execute(query, *data.values(), id)

    def delete(self, id: int) -> None:
        """Soft delete record (Core Principle #4: Data is Forever).

        NEVER hard deletes. Sets record_status='deleted' instead.

        Args:
            id: Record ID to delete
        """
        query = f"""
            UPDATE {self.table_name}
            SET record_status = 'deleted',
                updated_at = CURRENT_TIMESTAMP
            WHERE id = ?
        """
        self._execute(query, id)

    def _query(self, query: str, *params) -> List[Any]:
        """Execute query and return all results."""
        cursor = self.connection.execute(query, params)
        return cursor.fetchall()

    def _query_one(self, query: str, *params) -> Optional[Any]:
        """Execute query and return single result."""
        cursor = self.connection.execute(query, params)
        return cursor.fetchone()

    def _execute(self, query: str, *params) -> Any:
        """Execute query without returning results."""
        return self.connection.execute(query, params)

Repository Pattern Benefits:

  1. Abstraction: Hide database implementation details
  2. Testability: Easy to mock for tests
  3. Consistency: All repositories use same patterns
  4. Soft Deletes: Enforced at base class level
  5. SQL Injection Prevention: Table name whitelist

Usage in EPGOAT: - EventRepository: Manages events table - ParticipantRepository: Manages participants table - UnmatchedChannelRepository: Manages unmatched_channels table

All inherit from BaseRepository for consistent behavior.

💡 Real EPGOAT Class Example - EnhancedMatchCache

#!/usr/bin/env python3

"""Enhanced Match Cache (P2-012) - Layer 2.

Provides short-term match caching with dual lookup strategy (tvg-id OR channel_name)
for same-day re-processing optimization.

Key Features:
- Dual lookup strategy: Priority 1 (tvg-id), Priority 2 (channel_name)
- Cache expiration: 24-48h TTL
- Same-day re-processing optimization (6am → 12pm → 6pm)
- Metrics tracking for cache hit rate analysis
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, Optional


@dataclass
class CachedMatch:
    """Cached match result with dual identifiers.

    Attributes:
        tvg_id: TVG-ID from M3U (if available)
        channel_name: Channel display name
        channel_family: Channel family/category (e.g., "NBA", "NFL")
        channel_payload: Extracted payload (e.g., "Lakers vs Celtics")
        matched_event_id: ID of matched event from API
        league: League identifier (e.g., "NBA", "Premier League")
        sport: Sport name (e.g., "Basketball", "Soccer")
        confidence: Match confidence score (0.0-1.0)
        cached_at: When this match was cached
    """

    tvg_id: str | None
    channel_name: str
    channel_family: str
    channel_payload: str
    matched_event_id: int
    league: str
    sport: str
    confidence: float
    cached_at: datetime = field(default_factory=datetime.now)


class EnhancedMatchCache:
    """Enhanced match cache with dual lookup strategy.

    Provides short-term caching (24-48h) for same-day re-processing optimization.
    Uses dual lookup strategy: tvg-id (priority 1) OR channel_name (priority 2).

    Benefits:
    - 95%+ cache hit rate for same-day re-processing
    - Reduces API calls for multiple EPG generations per day
    - Supports providers with and without tvg-ids
    - Automatic expiration prevents stale data

    Example:
        >>> cache = EnhancedMatchCache(expiration_hours=24)
        >>>
        >>> # Store match:
        >>> cache.store_match(
        ...     tvg_id="nba-lakers-123",
        ...     channel_name="NBA 01: Lakers vs Celtics",
        ...     channel_family="NBA",
        ...     channel_payload="Lakers vs Celtics",
        ...     matched_event_id=12345,
        ...     league="NBA",
        ...     sport="Basketball",
        ...     confidence=0.95
        ... )
        >>>
        >>> # Find match (fast lookup):
        >>> result = cache.find_match(tvg_id="nba-lakers-123")
        >>> result.matched_event_id
        12345
    """

    def __init__(self, expiration_hours: int = 24) -> None:
        """Initialize cache with expiration time.

        Args:
            expiration_hours: How long to cache matches (default: 24 hours)
        """
        self._cache: Dict[str, CachedMatch] = {}
        self._expiration_hours = expiration_hours
        self._hits = 0
        self._misses = 0

    def find_match(
        self,
        tvg_id: Optional[str] = None,
        channel_name: Optional[str] = None
    ) -> Optional[CachedMatch]:
        """Find cached match by tvg-id or channel name.

        Dual lookup strategy:
        1. Try tvg-id first (faster, more reliable)
        2. Fall back to channel_name (works without tvg-id)

        Args:
            tvg_id: TVG-ID to lookup (priority 1)
            channel_name: Channel name to lookup (priority 2)

        Returns:
            CachedMatch if found and not expired, None otherwise
        """
        # Try tvg-id first (priority 1):
        if tvg_id:
            match = self._cache.get(f"tvg:{tvg_id}")
            if match and not self._is_expired(match):
                self._hits += 1
                return match

        # Fall back to channel name (priority 2):
        if channel_name:
            match = self._cache.get(f"name:{channel_name}")
            if match and not self._is_expired(match):
                self._hits += 1
                return match

        self._misses += 1
        return None

    def store_match(
        self,
        tvg_id: Optional[str],
        channel_name: str,
        channel_family: str,
        channel_payload: str,
        matched_event_id: int,
        league: str,
        sport: str,
        confidence: float
    ) -> None:
        """Store match in cache with dual keys.

        Stores under both tvg-id and channel_name for dual lookup support.

        Args:
            tvg_id: TVG-ID from M3U (optional)
            channel_name: Channel display name (required)
            channel_family: Channel family (e.g., "NBA")
            channel_payload: Extracted payload (e.g., "Lakers vs Celtics")
            matched_event_id: ID of matched event
            league: League identifier
            sport: Sport name
            confidence: Match confidence (0.0-1.0)
        """
        match = CachedMatch(
            tvg_id=tvg_id,
            channel_name=channel_name,
            channel_family=channel_family,
            channel_payload=channel_payload,
            matched_event_id=matched_event_id,
            league=league,
            sport=sport,
            confidence=confidence
        )

        # Store under tvg-id if available:
        if tvg_id:
            self._cache[f"tvg:{tvg_id}"] = match

        # Always store under channel name:
        self._cache[f"name:{channel_name}"] = match

    def _is_expired(self, match: CachedMatch) -> bool:
        """Check if cached match has expired.

        Args:
            match: Cached match to check

        Returns:
            True if expired, False otherwise
        """
        age = datetime.now() - match.cached_at
        return age > timedelta(hours=self._expiration_hours)

    def get_hit_rate(self) -> float:
        """Calculate cache hit rate.

        Returns:
            Hit rate as percentage (0.0-1.0)
        """
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0

Key Patterns Demonstrated:

  1. Type Hints: 100% coverage (every parameter and return)
  2. Docstrings: Google style for all public methods
  3. Dataclasses: Clean data structures with defaults
  4. Private Methods: _is_expired() marked with underscore
  5. Dual Lookup: Flexible strategy (tvg-id OR channel_name)
  6. Metrics: Track hits/misses for performance analysis
  7. Expiration: Automatic cache invalidation

Real Performance: - 95%+ hit rate for same-day re-processing - O(1) lookup time (dict lookup) - Saves 100+ API calls per run (100-500ms each)

API Clients: infrastructure/clients/

Client Purpose File
TheSportsDB Primary event database api_client.py
ESPN API Fallback event data espn_api_client.py
TV Schedule API Alternate event source tv_schedule_client.py
Tracked Clients API call tracking wrapper tracked_api_clients.py

Parsers: infrastructure/parsers/

Parser Purpose File
Provider M3U Parser Parse M3U with provider-specific logic provider_m3u_parser.py
VOD Detector Filter VOD channels (91.7% reduction) vod_detector.py
Channel Parser Extract channel metadata channel_parser.py
Event Matcher Match channels to events (deprecated) event_matcher.py

Application Layer: application/

Purpose: High-level workflows, orchestration

File Purpose
epg_generator.py Main EPG generation pipeline

CLI Layer: cli/

Purpose: Command-line interfaces

Script Purpose Command
run_provider.py Generate EPG for one provider python cli/run_provider.py --provider tps
onboard_provider.py Onboard new provider (pattern discovery) python cli/onboard_provider.py --provider necro
provider_runner/ Provider execution framework (used by run_provider.py)

Utilities: utilities/

Purpose: Helper scripts for manual operations

Script Purpose
refresh_event_db_v2.py Refresh events from TheSportsDB
refresh_leagues.py Refresh league data
analyze_tps_m3u.py Analyze M3U playlist
migrate_database.py Run database migrations

Run from: backend/epgoat/ directory


Configuration: backend/config/

Purpose: YAML configuration files

Provider Configs: config/providers/

providers/
└── tps.yml                 # TPS provider config (72 patterns, VOD filters, TVG-IDs)
                            # Auto-generated from database by provider_config_manager.py
                            # 24-hour cache TTL

📊 Provider Configuration Loading Flow

flowchart TD
    Request[Load Provider Config] --> Cache{YAML Cache Exists?}

    Cache -->|Yes| Age{Cache < 24h old?}
    Age -->|Yes| LoadYAML[Load from YAML<br/>⚡ 53x faster]
    Age -->|No| FetchDB

    Cache -->|No| FetchDB[Fetch from Database<br/>🐢 Slower]

    FetchDB --> BuildYAML[Build YAML Structure]
    BuildYAML --> WriteCache[Write YAML Cache]
    WriteCache --> LoadYAML

    LoadYAML --> Config[ProviderConfig Object]

    style LoadYAML fill:#90EE90
    style FetchDB fill:#FFD700
    style Config fill:#87CEEB

*Hybrid Configuration Strategy:

  • Database: Source of truth (patterns, VOD filters, TVG-IDs)
  • YAML Cache: Performance optimization (53x faster loading)
  • 24-hour TTL: Balance freshness vs performance

File Location: backend/config/providers/<provider_slug>.yml

Example: TPS provider config has 72 patterns, VOD filters, TVG-ID mappings

Benefits: - Database changes auto-sync to YAML within 24 hours - Fast loads during development (YAML) - Single source of truth (database) *

Global Configs

File Purpose Count
sport_emojis.yml Sport → emoji mappings 89 sports
sport_categories.yml Sport → XMLTV category mappings 77 sports
channel_patterns.yml Global channel regex patterns ~50 patterns
matching_config.yml Matching pipeline configuration N/A
api_config.yml API client configuration N/A
family_mappings/universal.yml Universal family-league mappings N/A
family_mappings/tps.yml TPS-specific family-league mappings N/A

Tests: tests/

Purpose: Test suite (pytest)

tests/
├── test_patterns.py            # Pattern matching tests
├── test_parsers.py             # M3U parsing tests
├── test_schedulers.py          # Programme scheduling tests
├── test_config.py              # Configuration loading tests
├── test_schemas.py             # Schema validation tests
├── test_integration.py         # End-to-end workflow tests
├── conftest.py                 # Pytest fixtures
└── fixtures/                   # Test data
    ├── sample.m3u              # Sample playlist
    └── 2025-11-01-channel-names-1386.csv  # Real TPS channels

Run: make test (from project root) or pytest (from backend/epgoat/)

Coverage: 98.2% (14 failures due to cache bugs, not restructure issues)

📊 Test Suite Organization

graph TD
    Tests[tests/] --> Unit[Unit Tests<br/>~700 tests]
    Tests --> Integration[Integration Tests<br/>~50 tests]
    Tests --> E2E[End-to-End Tests<br/>~10 tests]

    Unit --> UnitFiles[test_patterns.py<br/>test_parsers.py<br/>test_models.py<br/>test_schemas.py]

    Integration --> IntFiles[test_enrichment.py<br/>test_services.py<br/>test_caching.py]

    E2E --> E2EFiles[test_integration.py<br/>test_epg_generation.py]

    Tests --> Fixtures[fixtures/<br/>conftest.py]
    Fixtures --> FixFiles[sample.m3u<br/>sample_events.json<br/>test_providers.yml]

    style Unit fill:#90EE90
    style Integration fill:#FFD700
    style E2E fill:#FFB6C1
    style Fixtures fill:#87CEEB

*Test Pyramid Distribution:

  • Unit Tests (90%): Fast, isolated, focused
  • Test individual functions/classes
  • Mock external dependencies
  • Run in milliseconds

  • Integration Tests (7%): Medium speed, multi-component

  • Test component interactions
  • Use real services (with mocked externals)
  • Run in seconds

  • End-to-End Tests (3%): Slow, comprehensive

  • Test complete workflows
  • Minimal mocking
  • Run in 10-30 seconds each

Run Commands: - pytest tests/test_patterns.py (specific file) - pytest tests/test_patterns.py::test_nba_pattern (specific test) - pytest -k "match" (tests matching "match") - pytest -v (verbose output) *


Common File Locations

When working on...

Task Look Here
Adding new sport config/sport_emojis.yml, config/sport_categories.yml
Adding channel pattern config/channel_patterns.yml or provider config
Modifying match logic services/enrichment/handlers/
Adding new handler services/enrichment/handlers/ + factory.py
Database queries infrastructure/database/repositories/
Schema changes infrastructure/database/migrations/ (new numbered file)
API integration infrastructure/clients/
M3U parsing infrastructure/parsers/provider_m3u_parser.py
XMLTV generation domain/xmltv.py
Team name resolution services/team_name_resolution_service.py
Provider onboarding services/provider_onboarding_service.py
Configuration loading services/provider_config_manager.py

💡 Finding Files - Common Tasks

# ============================================================
# Task: Add new sport emoji
# ============================================================
# File: backend/config/sport_emojis.yml
# Add: Cricket: 🏏

# ============================================================
# Task: Add channel pattern for new league
# ============================================================
# Option 1: Provider-specific (preferred)
# File: backend/config/providers/tps.yml
# Add to patterns section:
#   - pattern: '^IPL\s+\d+\s*:?'
#     sport_family: 'IPL'
#     priority: 100

# Option 2: Global patterns
# File: backend/config/channel_patterns.yml
# Add to patterns list

# ============================================================
# Task: Modify match logic
# ============================================================
# Files: backend/epgoat/services/enrichment/handlers/
# - regex_handler.py (pattern matching)
# - api_handler.py (API calls)
# - local_database_handler.py (DB lookups)

# ============================================================
# Task: Add new database table
# ============================================================
# Step 1: Create migration
cd backend/epgoat/infrastructure/database/migrations
# Create: 018_add_new_table.sql

# Step 2: Run migration
cd ../
python migration_runner.py

# Step 3: Create repository
cd repositories/
# Create: new_table_repository.py (inherit from BaseRepository)

# ============================================================
# Task: Add tests for new feature
# ============================================================
# Unit tests: backend/epgoat/tests/test_<module_name>.py
# Integration tests: backend/epgoat/tests/test_integration.py

# ============================================================
# Task: Debug EPG generation issue
# ============================================================
# Entry point: backend/epgoat/cli/run_provider.py
# Pipeline: backend/epgoat/application/epg_generator.py
# Handlers: backend/epgoat/services/enrichment/handlers/
# Output: backend/epgoat/output/<provider>_<date>.xml

# ============================================================
# Task: Update provider configuration
# ============================================================
# Database: Update provider_patterns table (source of truth)
# YAML cache: backend/config/providers/<provider>.yml (auto-generated)
# Manager: backend/epgoat/services/provider_config_manager.py

# ============================================================
# Task: Check current database schema
# ============================================================
# Current schema: backend/epgoat/infrastructure/database/migrations/017_*.sql
# Base repository: backend/epgoat/infrastructure/database/repositories/base_repository.py
# All repositories: backend/epgoat/infrastructure/database/repositories/

File Organization Principles:

  1. Configuration: backend/config/ for YAML files
  2. Business Logic: backend/epgoat/services/ for services
  3. Data Models: backend/epgoat/domain/ for domain models
  4. Database: backend/epgoat/infrastructure/database/
  5. CLI Tools: backend/epgoat/cli/ for command-line scripts
  6. Tests: backend/epgoat/tests/ for test suite

Quick Find Commands:

# Find by filename:
find backend/epgoat -name "event_repository.py"

# Find by content:
grep -r "EnhancedMatchCache" backend/epgoat/

# Find by pattern:
find backend/epgoat -name "*cache*.py"

Import Patterns

Example imports (follow these patterns):

# Domain layer (no external deps)
from backend.epgoat.domain.models import Event, Channel
from backend.epgoat.domain.schemas import EventSchema
from backend.epgoat.domain.patterns import ALLOWED_CHANNEL_PATTERNS

# Services layer
from backend.epgoat.services.enhanced_match_cache import EnhancedMatchCache
from backend.epgoat.services.enrichment.pipeline import EnrichmentPipeline
from backend.epgoat.services.provider_config_manager import ProviderConfigManager

# Infrastructure layer
from backend.epgoat.infrastructure.database.connection import get_database_connection
from backend.epgoat.infrastructure.clients.api_client import TheSportsDBClient
from backend.epgoat.infrastructure.parsers.provider_m3u_parser import ProviderM3UParser

# Application layer
from backend.epgoat.application.epg_generator import generate_epg

💡 Import Patterns - By Layer

# ============================================================
# Domain Layer Imports (NO external dependencies)
# ============================================================
from backend.epgoat.domain.models import (
    Event,
    Channel,
    M3UEntry,
    Participant,
    create_channel
)
from backend.epgoat.domain.schemas import (
    EventSchema,
    ChannelSchema,
    ProviderConfigSchema
)
from backend.epgoat.domain.patterns import ALLOWED_CHANNEL_PATTERNS
from backend.epgoat.domain.xmltv import generate_xmltv

# ============================================================
# Services Layer Imports
# ============================================================
from backend.epgoat.services.enhanced_match_cache import EnhancedMatchCache
from backend.epgoat.services.cross_provider_cache import CrossProviderCache
from backend.epgoat.services.enrichment.pipeline import EnrichmentPipeline
from backend.epgoat.services.enrichment.factory import HandlerFactory
from backend.epgoat.services.provider_config_manager import ProviderConfigManager
from backend.epgoat.services.team_name_resolution_service import TeamNameResolutionService

# ============================================================
# Infrastructure Layer Imports
# ============================================================
from backend.epgoat.infrastructure.database.connection import get_database_connection
from backend.epgoat.infrastructure.database.repositories.event_repository import EventRepository
from backend.epgoat.infrastructure.clients.api_client import TheSportsDBClient
from backend.epgoat.infrastructure.parsers.provider_m3u_parser import ProviderM3UParser

# ============================================================
# Application Layer Imports
# ============================================================
from backend.epgoat.application.epg_generator import generate_epg

# ============================================================
# Usage Example - Dependency Injection
# ============================================================
def create_epg_pipeline(provider_slug: str) -> EnrichmentPipeline:
    """Create EPG pipeline with all dependencies (DDD pattern).

    Demonstrates proper dependency injection and layer separation.
    """
    # Infrastructure layer (external dependencies):
    db_connection = get_database_connection()
    api_client = TheSportsDBClient(api_key=get_api_key())
    event_repo = EventRepository(connection=db_connection)

    # Services layer (business logic):
    config_manager = ProviderConfigManager(connection=db_connection)
    match_cache = EnhancedMatchCache(expiration_hours=24)
    cross_cache = CrossProviderCache(expiration_hours=48)

    # Create handler factory:
    factory = HandlerFactory(
        event_repo=event_repo,
        api_client=api_client,
        match_cache=match_cache,
        cross_cache=cross_cache
    )

    # Application layer (orchestration):
    pipeline = EnrichmentPipeline(
        handlers=factory.create_handlers(),
        config_manager=config_manager
    )

    return pipeline

Import Guidelines:

  1. Absolute Imports: Always use from backend.epgoat.X import Y
  2. Never use relative imports (from ..models import Event)
  3. Works from any directory

  4. Layer Separation: Import from same layer or lower layers only

  5. ✅ Services → Domain (ok)
  6. ✅ Application → Services (ok)
  7. ❌ Domain → Services (VIOLATION)
  8. ❌ Services → Application (VIOLATION)

  9. Dependency Injection: Pass dependencies via constructor

  10. Don't import concrete classes in domain layer
  11. Use protocols for abstractions

  12. Type Hints: Import types for annotations

  13. from typing import List, Dict, Optional
  14. Use for function signatures

Key Design Patterns

  • Domain-Driven Design: Clear separation of domain, application, services, infrastructure
  • Chain of Responsibility: Enrichment pipeline (7 handlers)
  • Factory Pattern: Handler factory for dependency injection
  • Repository Pattern: Data access abstraction
  • Observer Pattern: Cost tracking, performance monitoring
  • Strategy Pattern: Multiple matching strategies (regex, API, LLM)
  • Service Layer: Business logic separate from infrastructure

Performance Hot Paths

Critical for speed: 1. VOD filtering: infrastructure/parsers/vod_detector.py (91.7% reduction) 2. Provider config loading: services/provider_config_manager.py (53x faster with YAML cache) 3. Enhanced match cache: services/enhanced_match_cache.py (O(1) lookup) 4. Event deduplication: services/event_deduplication.py (92% API call reduction) 5. Regex matcher: services/regex_matcher.py (multi-stage: exact → fuzzy)


For Deep Dives: Use IDE "Go to Definition" or grep -r "class_name" backend/epgoat/ For Architecture: See Documentation/03-Architecture/System-Overview.md