from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys from webdriver_manager.chrome import ChromeDriverManager import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, date from datetime import timedelta import time import os import signal import sys import threading import pytz from pathlib import Path # Global flag for Ctrl+C handling stop_flag = False def signal_handler(sig, frame): global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def setup_driver(): options = webdriver.ChromeOptions() # Comprehensive timezone enforcement options.add_argument("--timezone=Asia/Kolkata") options.add_experimental_option("prefs", { "timezone": "Asia/Kolkata", "intl.accept_languages": "en-IN", }) # Options to prevent timezone detection options.add_argument("--disable-geolocation") options.add_argument("--disable-notifications") options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--window-size=1920,1080") options.add_argument("--disable-dev-shm-usage") options.add_argument("--no-sandbox") options.add_argument("--disable-gpu") options.add_argument("--disable-software-rasterizer") options.add_argument("--disable-webgl") options.add_argument("--disable-webgl2") options.add_argument("--disable-features=WebGPU") options.add_argument("--log-level=3") options.add_experimental_option('excludeSwitches', ['enable-logging']) # Force India locale options.add_argument("--lang=en-IN") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) # Override timezone and locale using Chrome DevTools Protocol driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "Asia/Kolkata"}) driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-IN"}) return driver def countdown_timer(seconds): for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(): # Define path to config file in tempest_config subdirectory config_path = Path(__file__).parent / "tempest_config" / "dishtv.config.xml" try: if not config_path.exists(): raise FileNotFoundError(f"Config file not found at: {config_path}") tree = ET.parse(config_path) return {channel.get("site_id"): channel.get("xmltv_id") for channel in tree.findall(".//channel")} except Exception as e: print(f"Error loading channel mapping: {str(e)}") return {} def convert_time_format(time_str): try: if not time_str or " - " not in time_str: return None, None, None # Clean up the time string time_str = time_str.strip() start_str, end_str = [s.strip() for s in time_str.split(" - ")] # Get today's date in India timezone india_tz = pytz.timezone('Asia/Kolkata') today = datetime.now(india_tz) today_date = today.strftime("%Y%m%d") # Parse start time try: start = india_tz.localize(datetime.strptime(f"{today_date} {start_str}", "%Y%m%d %I:%M %p")) except ValueError: # Try without minutes if format is like "11 pm" start = india_tz.localize(datetime.strptime(f"{today_date} {start_str}", "%Y%m%d %I %p")) # Parse end time try: end = india_tz.localize(datetime.strptime(f"{today_date} {end_str}", "%Y%m%d %I:%M %p")) except ValueError: # Try without minutes if format is like "11 pm" end = india_tz.localize(datetime.strptime(f"{today_date} {end_str}", "%Y%m%d %I %p")) # Handle overnight programs (end time is earlier than start time) if end < start: end = end + timedelta(days=1) # Convert to GMT for EPG format start_gmt = start.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") end_gmt = end.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") # Return both IST (for display) and GMT (for XML) start_ist = start.strftime("%I:%M %p") end_ist = end.strftime("%I:%M %p") return start_ist, end_ist, (start_gmt, end_gmt) except Exception as e: print(f"Time conversion error for '{time_str}': {str(e)}") return None, None, None def safe_get_text(element, selector): try: elem = element.find_element(By.CSS_SELECTOR, selector) return elem.text if elem else None except: return None def close_overlays(driver): try: overlay = driver.find_element(By.CSS_SELECTOR, "div.wzrk-overlay") if overlay.is_displayed(): close_button = driver.find_element(By.CSS_SELECTOR, "button.wzrk-alert-close") close_button.click() time.sleep(1) except: pass def scroll_horizontally(driver): try: scroll_container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div.cmp-channelguide__channel-episodes-container"))) last_position = driver.execute_script("return arguments[0].scrollLeft", scroll_container) scroll_attempts = 0 while scroll_attempts < 5: driver.execute_script("arguments[0].scrollLeft += 1000;", scroll_container) time.sleep(1) new_position = driver.execute_script("return arguments[0].scrollLeft", scroll_container) if new_position == last_position: scroll_attempts += 1 else: scroll_attempts = 0 last_position = new_position return True except Exception as e: print(f"Horizontal scrolling failed: {str(e)}") return False def verify_page(driver): try: WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) return True except: return False def search_and_process_channel(driver, channel_num, channel_map): try: if not verify_page(driver): print("Error: Page has changed or is no longer valid") return None close_overlays(driver) search_box = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) search_box.clear() search_box.send_keys(channel_num) time.sleep(2) channel_xpath = f"//p[contains(@class, 'cmp-channelguide__search-result-channel-num') and contains(text(), '#{channel_num}')]/ancestor::li" try: channel_element = WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.XPATH, channel_xpath))) except: print(f"Channel {channel_num} not found in search results") return None driver.execute_script("arguments[0].click();", channel_element) time.sleep(3) if not verify_page(driver): print("Error: Page navigation failed") return None channel_id = channel_element.get_attribute("data-channel-id") if not channel_id: print(f"Could not find channel ID for channel {channel_num}") return [] if not scroll_horizontally(driver): return [] time.sleep(1) program_elements = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, f"div.cmp-channelguide__channel-tile[data-channel-id='{channel_id}']"))) raw_programs = [] for program in program_elements: try: if 'empty-episode-time' in program.get_attribute('class'): continue title_attr = program.get_attribute('title') if not title_attr: continue parts = [p.strip() for p in title_attr.split('|')] if len(parts) < 3: continue show_name = parts[0].replace('Title:', '').strip() time_slot = parts[1].replace('Time:', '').strip() if not time_slot or not show_name: continue start_ist, end_ist, (start_gmt, end_gmt) = convert_time_format(time_slot) if not start_gmt or not end_gmt: continue raw_programs.append({ "title": show_name, "start": start_gmt, "end": end_gmt, "start_ist": start_ist, "end_ist": end_ist }) print(f"Found: {channel_map[channel_num]} - {show_name} ({start_ist} - {end_ist})") except Exception as e: print(f"Skipping program on channel {channel_num} due to error: {str(e)}") continue # Sort programs by start time and fix missing/incorrect end times programs = [] if raw_programs: # Sort programs by start time raw_programs.sort(key=lambda x: x["start"]) # Fix missing/incorrect end times for i, prog in enumerate(raw_programs): try: # Ensure end time is after start time start_dt = datetime.strptime(prog["start"].split()[0], "%Y%m%d%H%M%S") end_dt = datetime.strptime(prog["end"].split()[0], "%Y%m%d%H%M%S") if end_dt <= start_dt: if i < len(raw_programs) - 1: # Use start time of next program as end time next_start = datetime.strptime(raw_programs[i+1]["start"].split()[0], "%Y%m%d%H%M%S") prog["end"] = next_start.strftime("%Y%m%d%H%M%S +0000") prog["end_ist"] = pytz.utc.localize(next_start).astimezone(pytz.timezone('Asia/Kolkata')).strftime("%I:%M %p") else: # Last program of the day - add 4 hours end_dt = start_dt + timedelta(hours=4) prog["end"] = end_dt.strftime("%Y%m%d%H%M%S +0000") prog["end_ist"] = pytz.utc.localize(end_dt).astimezone(pytz.timezone('Asia/Kolkata')).strftime("%I:%M %p") programs.append({ "title": prog["title"], "start": prog["start"], "end": prog["end"] }) except Exception as e: print(f"Skipping program due to error: {str(e)}") continue return programs except Exception as e: print(f"Could not process channel {channel_num}: {str(e)}") return [] finally: try: search_box = driver.find_element(By.CSS_SELECTOR, "#channel-guide-search") search_box.clear() search_box.send_keys(Keys.ESCAPE) time.sleep(1) except: pass def process_channels_by_search(driver, channel_map): channels_data = {} found_channels = set() print(f"Processing {len(channel_map)} channels via search...") for channel_num, channel_name in channel_map.items(): if stop_flag: break print(f"\nSearching for channel #{channel_num} ({channel_name})...") programs = search_and_process_channel(driver, channel_num, channel_map) if programs is None: if not verify_page(driver): print("Error: Page has changed, stopping channel processing") break continue elif programs: channels_data[channel_name] = programs found_channels.add(channel_num) return channels_data, found_channels def generate_epg(channels_data): if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") success = False channel_count = 0 program_count = 0 root = None # Define output path (tempest_config/epg subdirectory) output_dir = Path(__file__).parent / "tempest_config" / "epg" output_file = output_dir / "dishtv_epg.xml" try: # Create directory if it doesn't exist output_dir.mkdir(parents=True, exist_ok=True) root = ET.Element("tv", { "generator-info-name": "DishTV EPG Generator", "generator-info-url": "" }) for channel_id, programs in channels_data.items(): try: if not channel_id: continue if not programs: continue channel_elem = ET.SubElement(root, "channel", {"id": str(channel_id)}) ET.SubElement(channel_elem, "display-name").text = str(channel_id) channel_count += 1 valid_programs = 0 for program in programs: try: if not isinstance(program, dict): continue required_fields = ['title', 'start', 'end'] if not all(field in program for field in required_fields): continue if not all(program[field] for field in required_fields): continue program_attrs = { "start": str(program["start"]), "stop": str(program["end"]), "channel": str(channel_id) } programme = ET.SubElement(root, "programme", program_attrs) ET.SubElement(programme, "title").text = str(program["title"]) program_count += 1 valid_programs += 1 except Exception as prog_error: print(f"Skipping invalid program on channel {channel_id}: {str(prog_error)}") continue if not valid_programs: root.remove(channel_elem) channel_count -= 1 except Exception as channel_error: print(f"Error processing channel {channel_id}: {str(channel_error)}") continue if channel_count > 0 and program_count > 0: xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() try: with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") print(f"EPG file saved to {output_file.absolute()}") success = True if os.path.exists(output_file) and os.path.getsize(output_file) > 0: print(f"EPG file verification successful") else: success = False print("Warning: EPG file appears to be empty") except IOError as file_error: print(f"Failed to write EPG file: {str(file_error)}") success = False else: print("Warning: No valid EPG data to generate") except Exception as e: print(f"Critical error during EPG generation: {str(e)}") success = False if not success and root is not None: fallback_file = output_dir / "dishtv_epg_partial.xml" try: partial_xml = ET.tostring(root, encoding="utf-8", xml_declaration=True) with open(fallback_file, "w", encoding="utf-8") as f: f.write(parseString(partial_xml).toprettyxml()) print(f"Saved partial EPG data to {fallback_file}") except Exception as fallback_error: print(f"Failed to save partial data: {str(fallback_error)}") return success def scrape_dishtv(): channel_map = get_channel_mapping() target_ids = set(channel_map.keys()) print(f"Targeting {len(target_ids)} channels") driver = setup_driver() try: # Verify Chrome timezone print("Setting Chrome timezone to Asia/Kolkata...") timezone = driver.execute_script("return Intl.DateTimeFormat().resolvedOptions().timeZone") print(f"Chrome timezone: {timezone} (should be Asia/Kolkata)") # Additional verification current_time = driver.execute_script("return new Date().toString()") print(f"Browser current time: {current_time}") for attempt in range(3): if stop_flag: return None try: print(f"Loading page (attempt {attempt + 1})") driver.get("https://www.dishtv.in/channel-guide.html") WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) break except Exception as e: print(f"Page load failed: {str(e)}") if attempt == 2: raise time.sleep(5) channels_data, found_channels = process_channels_by_search(driver, channel_map) missing = target_ids - found_channels if missing: print(f"\nWarning: Could not find {len(missing)} channels:") for channel_id in missing: print(f"- {channel_map[channel_id]} ({channel_id})") if channels_data: generate_epg(channels_data) print(f"\nGenerated EPG with {len(channels_data)} channels and {sum(len(p) for p in channels_data.values())} programs") return channels_data except Exception as e: print(f"Error during scraping: {str(e)}") return None finally: if not stop_flag: try: driver.quit() except: pass if __name__ == "__main__": print("DishTV EPG Scraper (Search + Horizontal Scrolling)") print("=================================================") try: result = scrape_dishtv() if result: print("\nOperation completed successfully!") else: print("\nOperation completed with some channels missing") except KeyboardInterrupt: print("\nOperation cancelled by user") except Exception as e: print(f"\nError: {str(e)}") timer_thread = threading.Thread(target=countdown_timer, args=(10,)) timer_thread.daemon = True timer_thread.start() try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass