#!/usr/bin/env python3 """ FreeDishToday EPG Scraper with Complete Fixes Scrapes program schedules from freedishtoday.com and generates XMLTV EPG format """ import requests from bs4 import BeautifulSoup import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, timedelta import pytz from pathlib import Path import time import os import re import threading import sys import signal import random from time import sleep # Global flag for Ctrl+C handling stop_flag = False # Configuration constants MAX_RETRIES = 3 INITIAL_RETRY_DELAY = 5 MAX_RETRY_DELAY = 30 REQUEST_TIMEOUT = 30 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' DEFAULT_PROGRAM_DURATION = timedelta(hours=4) # Default duration when end time can't be determined def signal_handler(sig, frame): """Handle Ctrl+C interrupt gracefully""" global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def countdown_timer(seconds): """Display countdown timer before automatic exit""" for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(): """Get channel mapping from config file""" config_path = Path(__file__).parent / "tempest_config" / "freedishtoday.config.xml" try: if not config_path.exists(): raise FileNotFoundError(f"Config file not found at: {config_path}") tree = ET.parse(config_path) return {channel.get("site_id"): { "xmltv_id": channel.get("xmltv_id"), "display_name": channel.text.strip() if channel.text else channel.get("xmltv_id") } for channel in tree.findall(".//channel")} except Exception as e: print(f"Error loading channel mapping: {str(e)}") return {} def parse_datetime(date_str, time_str): """Parse combined date and time string into datetime object""" try: # Clean and normalize the strings date_str = re.sub(r'\s+', ' ', date_str.strip()) time_str = re.sub(r'\s+', ' ', time_str.strip()) # Handle special case where time starts with 00: if time_str.startswith('00:'): time_str = '12' + time_str[2:] # Parse the date (handles both "Mar" and "March" formats) india_tz = pytz.timezone('Asia/Kolkata') try: date_obj = datetime.strptime(date_str, "%d %b %Y") except ValueError: date_obj = datetime.strptime(date_str, "%d %B %Y") # Parse the time (e.g., "12:46 AM") time_obj = datetime.strptime(time_str, "%I:%M %p").time() # Combine date and time combined = datetime.combine(date_obj.date(), time_obj) return india_tz.localize(combined) except Exception as e: print(f"DateTime parsing error for '{date_str} {time_str}': {e}") return None def extract_date_from_header(header_text): """Extract date from header text""" match = re.search(r'(\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})', header_text) if match: return match.group(1) return None def make_request_with_retry(url, headers, max_retries=MAX_RETRIES): """Make HTTP request with retry logic""" retry_delay = INITIAL_RETRY_DELAY for attempt in range(max_retries + 1): try: if stop_flag: return None if attempt > 0: actual_delay = retry_delay + random.uniform(-2, 2) print(f"Attempt {attempt + 1}/{max_retries + 1}. Waiting {actual_delay:.1f} seconds before retry...") sleep(actual_delay) retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY) response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() return response except requests.exceptions.HTTPError as e: if e.response.status_code == 504 and attempt < max_retries: print(f"504 Gateway Timeout for {url}. Will retry...") continue raise except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e: if attempt < max_retries: print(f"Request error ({type(e).__name__}) for {url}. Will retry...") continue raise return None def process_program_times(programs_by_date): """Process program times to handle overnight programs and missing end times""" processed_programs = [] dates = sorted(programs_by_date.keys()) for i, date in enumerate(dates): date_programs = programs_by_date[date] for j, program in enumerate(date_programs): # Handle end time calculation if j < len(date_programs) - 1: # Normal case - end time is next program's start time end_time = date_programs[j+1]["start"] else: # Last program of the day if i < len(dates) - 1: # Use first program of next day as end time if available next_day_programs = programs_by_date[dates[i+1]] if next_day_programs: end_time = next_day_programs[0]["start"] else: # No programs next day, use default duration end_time = program["start"] + DEFAULT_PROGRAM_DURATION else: # No next day available, use default duration end_time = program["start"] + DEFAULT_PROGRAM_DURATION # Ensure end time is after start time (for overnight programs) if end_time < program["start"]: end_time += timedelta(days=1) # Convert to GMT for EPG format start_gmt = program["start"].astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") end_gmt = end_time.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") processed_programs.append({ "title": program["title"], "start": start_gmt, "end": end_gmt, "genre": program["genre"], "date": program["start"].date() }) return processed_programs def scrape_channel_schedule(channel_url, channel_info): """Scrape schedule for a single channel from freedishtoday.com""" try: if stop_flag: return [] headers = {'User-Agent': USER_AGENT} print(f"Fetching: {channel_url}") response = make_request_with_retry(channel_url, headers) if not response: print(f"Failed to fetch data for {channel_info['display_name']} after retries") return [] soup = BeautifulSoup(response.text, 'html.parser') entry_content = soup.find('div', class_='entry-content') if not entry_content: print(f"No entry content found for {channel_info['display_name']}") return [] programs_by_date = {} current_date = None # Find all elements in entry content for element in entry_content.children: if stop_flag: return [] if element.name == 'p' and 'has-text-align-center' in element.get('class', []): header_text = element.get_text(strip=True) date_match = extract_date_from_header(header_text) if date_match: current_date = date_match if current_date not in programs_by_date: programs_by_date[current_date] = [] elif element.name == 'figure' and 'wp-block-table' in element.get('class', []): if not current_date: continue table = element.find('table') if not table: continue # Process each row in the table for row in table.find_all('tr'): if stop_flag: return [] cols = row.find_all('td') if len(cols) < 1: continue # Extract program details cell = cols[0] cell_text = cell.get_text('\n', strip=True) lines = [line.strip() for line in cell_text.split('\n') if line.strip()] if len(lines) < 2: continue # Clean title title = lines[0] title = re.sub(r'^\s*strong>\s*|\s*<\/?strong>\s*', '', title) # Get genre genre = lines[1] if len(lines) >= 2 else "" # Find time in subsequent lines time_str = None for line in lines[2:]: time_match = re.search(r'(\d{1,2}:\d{2}\s*[AP]M)', line, re.IGNORECASE) if time_match: time_str = time_match.group(1) break if not time_str: continue dt = parse_datetime(current_date, time_str) if not dt: continue programs_by_date[current_date].append({ "title": title, "start": dt, "genre": genre }) # Process all programs to calculate correct end times all_programs = process_program_times(programs_by_date) return all_programs except Exception as e: print(f"Error scraping {channel_info['display_name']}: {str(e)}") return [] def generate_epg(channels_data): """Generate XMLTV format EPG file""" if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") success = False channel_count = 0 program_count = 0 root = None output_dir = Path(__file__).parent / "tempest_config" / "epg" output_file = output_dir / "freedishtoday_epg.xml" try: output_dir.mkdir(parents=True, exist_ok=True) root = ET.Element("tv", { "generator-info-name": "FreeDishToday EPG Generator", "generator-info-url": "" }) for channel_id, channel_info in channels_data.items(): try: programs = channel_info['programs'] if not programs: continue channel_elem = ET.SubElement(root, "channel", {"id": channel_info['xmltv_id']}) ET.SubElement(channel_elem, "display-name").text = channel_info['display_name'] channel_count += 1 for program in programs: try: if not all(k in program for k in ['title', 'start', 'end']): continue program_attrs = { "start": program["start"], "stop": program["end"], "channel": channel_info['xmltv_id'] } programme = ET.SubElement(root, "programme", program_attrs) ET.SubElement(programme, "title", {'lang': 'en'}).text = program["title"] if program.get("genre"): ET.SubElement(programme, "category", {'lang': 'en'}).text = program["genre"] program_count += 1 except Exception as e: continue except Exception as e: print(f"Error processing channel {channel_info['xmltv_id']}: {str(e)}") continue if channel_count > 0 and program_count > 0: xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") print(f"EPG file saved to {output_file.absolute()}") success = True except Exception as e: print(f"Error during EPG generation: {str(e)}") success = False return success def scrape_freedishtoday(): """Main scraping function""" channel_map = get_channel_mapping() if not channel_map: print("Error: No channel mappings found") return None channels_data = {} for site_id, channel_info in channel_map.items(): if stop_flag: break print(f"\nScraping {channel_info['display_name']}...") channel_url = f"https://freedishtoday.com/{site_id}/" programs = scrape_channel_schedule(channel_url, channel_info) if programs: channels_data[site_id] = { 'xmltv_id': channel_info['xmltv_id'], 'display_name': channel_info['display_name'], 'programs': programs } print(f"Found {len(programs)} programs for {channel_info['display_name']}") else: print(f"No programs found for {channel_info['display_name']}") if channels_data and not stop_flag: generate_epg(channels_data) return channels_data if __name__ == "__main__": print("FreeDishToday EPG Scraper with Complete Fixes") print("============================================") print(f"Configuration: Max Retries={MAX_RETRIES}, Initial Delay={INITIAL_RETRY_DELAY}s") print(f"Max Delay={MAX_RETRY_DELAY}s, Timeout={REQUEST_TIMEOUT}s\n") start_time = time.time() result = scrape_freedishtoday() elapsed = time.time() - start_time if result: print(f"\nOperation completed successfully in {elapsed:.2f} seconds!") print(f"Scraped {len(result)} channels") else: print("\nOperation completed with errors") # Start countdown timer in background timer_thread = threading.Thread(target=countdown_timer, args=(10,)) timer_thread.daemon = True timer_thread.start() try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass