from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, date, timedelta import time import os import signal import sys import threading # Global flag for Ctrl+C handling stop_flag = False def signal_handler(sig, frame): global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def setup_driver(): options = webdriver.ChromeOptions() options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--window-size=1920,1080") options.add_argument("--disable-dev-shm-usage") options.add_argument("--no-sandbox") options.add_argument("--disable-gpu") options.add_argument("--disable-software-rasterizer") options.add_argument("--disable-webgl") options.add_argument("--disable-webgl2") options.add_argument("--disable-features=WebGPU") options.add_argument("--log-level=3") options.add_experimental_option('excludeSwitches', ['enable-logging']) options.add_argument("--disable-direct-composition") options.add_argument("--disable-direct-composition-layers") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) return driver def countdown_timer(seconds): """Countdown timer that will exit the program after specified seconds""" for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(xml_file): tree = ET.parse(xml_file) return {channel.get("site_id"): channel.get("xmltv_id") for channel in tree.findall(".//channel")} def convert_time_format(time_str): try: if not time_str or " - " not in time_str: return None, None start_str, end_str = time_str.split(" - ") today = date.today().strftime("%Y%m%d") start = datetime.strptime(f"{today} {start_str}", "%Y%m%d %I:%M %p") end = datetime.strptime(f"{today} {end_str}", "%Y%m%d %I:%M %p") start_gmt = (start - timedelta(hours=5, minutes=30)).strftime("%Y%m%d%H%M%S +0000") end_gmt = (end - timedelta(hours=5, minutes=30)).strftime("%Y%m%d%H%M%S +0000") return start_gmt, end_gmt except Exception as e: print(f"Time conversion error: {e}") return None, None def safe_get_text(element, selector): try: elem = element.find_element(By.CSS_SELECTOR, selector) return elem.text if elem else None except: return None def scroll_to_load_all_content(driver): """Improved scrolling that actually works""" container = driver.find_element(By.CSS_SELECTOR, "#channel-cards-container") last_height = driver.execute_script("return arguments[0].scrollHeight", container) scroll_attempts = 0 while scroll_attempts < 30: # Max 30 scroll attempts if stop_flag: break # Scroll to bottom driver.execute_script("arguments[0].scrollTop = arguments[0].scrollHeight", container) time.sleep(3) # Increased wait time # Calculate new scroll height new_height = driver.execute_script("return arguments[0].scrollHeight", container) if new_height == last_height: scroll_attempts += 1 continue last_height = new_height scroll_attempts = 0 # Reset counter if we got new content # Final check for elements program_elements = driver.find_elements(By.CSS_SELECTOR, "div.cmp-channelguide__channel-tile") print(f"Found {len(program_elements)} programs after scrolling") def process_all_channels(driver, channel_map): channels_data = {} found_channels = set() # Find all program elements program_elements = driver.find_elements(By.CSS_SELECTOR, "div.cmp-channelguide__channel-tile") print(f"Processing {len(program_elements)} programs...") for element in program_elements: if stop_flag: break try: # Get channel ID and verify it's in our target list channel_id = element.get_attribute("data-channel-id") if not channel_id or channel_id not in channel_map: continue # Scroll element into view driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element) time.sleep(0.1) # Get program details with error handling try: time_slot = safe_get_text(element, "span.cmp-channelguide__channel-time") show_name = safe_get_text(element, "span.cmp-channel__episode-name") if not time_slot or not show_name: continue start, end = convert_time_format(time_slot) if not start or not end: continue # Add to our data structure channel_name = channel_map[channel_id] if channel_name not in channels_data: channels_data[channel_name] = [] channels_data[channel_name].append({ "title": show_name, "start": start, "end": end }) found_channels.add(channel_id) # Print progress - showing channel name with program print(f"Found: {channel_name} - {show_name} ({time_slot})") except Exception as e: print(f"Skipping program on channel {channel_id} due to error: {str(e)}") continue except Exception as e: print(f"Error processing element: {str(e)}") continue return channels_data, found_channels def scrape_dishtv(): channel_map = get_channel_mapping("dishtv.config.xml") target_ids = set(channel_map.keys()) print(f"Targeting {len(target_ids)} channels") driver = setup_driver() try: # Load page with retries for attempt in range(3): if stop_flag: return None try: print(f"Loading page (attempt {attempt + 1})") driver.get("https://www.dishtv.in/channel-guide.html") WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-cards-container"))) break except Exception as e: print(f"Page load failed: {str(e)}") if attempt == 2: raise time.sleep(5) # Scroll to load all content print("Scrolling to load all content (this may take a few minutes)...") scroll_to_load_all_content(driver) # Process all channels channels_data, found_channels = process_all_channels(driver, channel_map) # Check for missing channels missing = target_ids - found_channels if missing: print(f"\nWarning: Could not find {len(missing)} channels:") for channel_id in missing: print(f"- {channel_map[channel_id]} ({channel_id})") # Generate EPG if channels_data: generate_epg(channels_data) print(f"\nGenerated EPG with {len(channels_data)} channels and {sum(len(p) for p in channels_data.values())} programs") return channels_data except Exception as e: print(f"Error during scraping: {str(e)}") return None finally: if not stop_flag: driver.quit() def generate_epg(channels_data): """ Generate EPG XML file with maximum robustness and error handling. Preserves all valid data while gracefully handling any errors. """ if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") # Initialize counters and success flag success = False channel_count = 0 program_count = 0 root = None output_file = "dishtv_epg.xml" try: # Create root element root = ET.Element("tv", { "generator-info-name": "DishTV EPG Generator", "generator-info-url": "" }) # Process each channel for channel_id, programs in channels_data.items(): try: # Validate channel ID if not channel_id: print(f"Skipping channel with empty ID (programs: {len(programs) if programs else 0})") continue # Skip if no programs if not programs: continue # Create channel element channel_elem = ET.SubElement(root, "channel", {"id": str(channel_id)}) ET.SubElement(channel_elem, "display-name").text = str(channel_id) channel_count += 1 # Process each program valid_programs = 0 for program in programs: try: # Validate program data structure if not isinstance(program, dict): continue # Check required fields required_fields = ['title', 'start', 'end'] if not all(field in program for field in required_fields): continue # Validate field values if not all(program[field] for field in required_fields): continue # Create program element program_attrs = { "start": str(program["start"]), "stop": str(program["end"]), "channel": str(channel_id) } programme = ET.SubElement(root, "programme", program_attrs) ET.SubElement(programme, "title").text = str(program["title"]) program_count += 1 valid_programs += 1 except Exception as prog_error: print(f"Skipping invalid program on channel {channel_id}: {str(prog_error)}") continue if not valid_programs: # Remove channel if it had no valid programs root.remove(channel_elem) channel_count -= 1 except Exception as channel_error: print(f"Error processing channel {channel_id}: {str(channel_error)}") continue # Only generate XML if we have valid data if channel_count > 0 and program_count > 0: # Generate XML with proper encoding xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() # Write to file with proper error handling try: with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") success = True # Verify file was written if os.path.exists(output_file) and os.path.getsize(output_file) > 0: print(f"EPG file saved to {output_file}") else: success = False print("Warning: EPG file appears to be empty") except IOError as file_error: print(f"Failed to write EPG file: {str(file_error)}") success = False else: print("Warning: No valid EPG data to generate") except Exception as e: print(f"Critical error during EPG generation: {str(e)}") success = False # Fallback: Attempt to save partial data if main generation failed if not success and root is not None: fallback_file = "dishtv_epg_partial.xml" try: partial_xml = ET.tostring(root, encoding="utf-8", xml_declaration=True) with open(fallback_file, "w", encoding="utf-8") as f: f.write(parseString(partial_xml).toprettyxml()) print(f"Saved partial EPG data to {fallback_file}") except Exception as fallback_error: print(f"Failed to save partial data: {str(fallback_error)}") return success if __name__ == "__main__": print("DishTV EPG Scraper") print("===================") try: result = scrape_dishtv() if result: print("\nOperation completed successfully!") else: print("\nOperation completed with some channels missing") except KeyboardInterrupt: print("\nOperation cancelled by user") except Exception as e: print(f"\nError: {str(e)}") # Start countdown timer in a separate thread timer_thread = threading.Thread(target=countdown_timer, args=(60,)) timer_thread.daemon = True timer_thread.start() # Wait for user input or timer to complete try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass # In case input is interrupted