from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys from webdriver_manager.chrome import ChromeDriverManager import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, date import time import os import signal import sys import threading import pytz # Global flag for Ctrl+C handling stop_flag = False def signal_handler(sig, frame): global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def setup_driver(): options = webdriver.ChromeOptions() # Comprehensive timezone enforcement options.add_argument("--timezone=Asia/Kolkata") options.add_experimental_option("prefs", { "timezone": "Asia/Kolkata", "intl.accept_languages": "en-IN", }) # Options to prevent timezone detection options.add_argument("--disable-geolocation") options.add_argument("--disable-notifications") options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--window-size=1920,1080") options.add_argument("--disable-dev-shm-usage") options.add_argument("--no-sandbox") options.add_argument("--disable-gpu") options.add_argument("--disable-software-rasterizer") options.add_argument("--disable-webgl") options.add_argument("--disable-webgl2") options.add_argument("--disable-features=WebGPU") options.add_argument("--log-level=3") options.add_experimental_option('excludeSwitches', ['enable-logging']) # Force India locale options.add_argument("--lang=en-IN") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) # Override timezone and locale using Chrome DevTools Protocol driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "Asia/Kolkata"}) driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-IN"}) return driver def countdown_timer(seconds): for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(xml_file): tree = ET.parse(xml_file) return {channel.get("site_id"): channel.get("xmltv_id") for channel in tree.findall(".//channel")} def convert_time_format(time_str): try: if not time_str or " - " not in time_str: return None, None # Parse time string as IST regardless of system time start_str, end_str = time_str.split(" - ") today = datetime.now(pytz.timezone('Asia/Kolkata')).strftime("%Y%m%d") # Parse times as IST india_tz = pytz.timezone('Asia/Kolkata') start = india_tz.localize(datetime.strptime(f"{today} {start_str}", "%Y%m%d %I:%M %p")) end = india_tz.localize(datetime.strptime(f"{today} {end_str}", "%Y%m%d %I:%M %p")) # Convert to GMT for EPG format start_gmt = start.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") end_gmt = end.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") return start_gmt, end_gmt except Exception as e: print(f"Time conversion error: {e}") return None, None def safe_get_text(element, selector): try: elem = element.find_element(By.CSS_SELECTOR, selector) return elem.text if elem else None except: return None def close_overlays(driver): try: overlay = driver.find_element(By.CSS_SELECTOR, "div.wzrk-overlay") if overlay.is_displayed(): close_button = driver.find_element(By.CSS_SELECTOR, "button.wzrk-alert-close") close_button.click() time.sleep(1) except: pass def scroll_horizontally(driver): try: scroll_container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div.cmp-channelguide__channel-episodes-container"))) last_position = driver.execute_script("return arguments[0].scrollLeft", scroll_container) scroll_attempts = 0 while scroll_attempts < 5: driver.execute_script("arguments[0].scrollLeft += 1000;", scroll_container) time.sleep(1) new_position = driver.execute_script("return arguments[0].scrollLeft", scroll_container) if new_position == last_position: scroll_attempts += 1 else: scroll_attempts = 0 last_position = new_position return True except Exception as e: print(f"Horizontal scrolling failed: {str(e)}") return False def verify_page(driver): try: WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) return True except: return False def search_and_process_channel(driver, channel_num, channel_map): try: if not verify_page(driver): print("Error: Page has changed or is no longer valid") return None close_overlays(driver) search_box = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) search_box.clear() search_box.send_keys(channel_num) time.sleep(2) channel_xpath = f"//p[contains(@class, 'cmp-channelguide__search-result-channel-num') and contains(text(), '#{channel_num}')]/ancestor::li" try: channel_element = WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.XPATH, channel_xpath))) except: print(f"Channel {channel_num} not found in search results") return None driver.execute_script("arguments[0].click();", channel_element) time.sleep(3) if not verify_page(driver): print("Error: Page navigation failed") return None channel_id = channel_element.get_attribute("data-channel-id") if not channel_id: print(f"Could not find channel ID for channel {channel_num}") return [] if not scroll_horizontally(driver): return [] time.sleep(1) program_elements = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, f"div.cmp-channelguide__channel-tile[data-channel-id='{channel_id}']"))) programs = [] for program in program_elements: try: if 'empty-episode-time' in program.get_attribute('class'): continue title_attr = program.get_attribute('title') if not title_attr: continue parts = [p.strip() for p in title_attr.split('|')] if len(parts) < 3: continue show_name = parts[0].replace('Title:', '').strip() time_slot = parts[1].replace('Time:', '').strip() if not time_slot or not show_name: continue start, end = convert_time_format(time_slot) if not start or not end: continue programs.append({ "title": show_name, "start": start, "end": end }) print(f"Found: {channel_map[channel_num]} - {show_name} ({time_slot})") except Exception as e: print(f"Skipping program on channel {channel_num} due to error: {str(e)}") continue return programs except Exception as e: print(f"Could not process channel {channel_num}: {str(e)}") return [] finally: try: search_box = driver.find_element(By.CSS_SELECTOR, "#channel-guide-search") search_box.clear() search_box.send_keys(Keys.ESCAPE) time.sleep(1) except: pass def process_channels_by_search(driver, channel_map): channels_data = {} found_channels = set() print(f"Processing {len(channel_map)} channels via search...") for channel_num, channel_name in channel_map.items(): if stop_flag: break print(f"\nSearching for channel #{channel_num} ({channel_name})...") programs = search_and_process_channel(driver, channel_num, channel_map) if programs is None: if not verify_page(driver): print("Error: Page has changed, stopping channel processing") break continue elif programs: channels_data[channel_name] = programs found_channels.add(channel_num) return channels_data, found_channels def generate_epg(channels_data): if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") success = False channel_count = 0 program_count = 0 root = None output_file = "dishtv_epg.xml" try: root = ET.Element("tv", { "generator-info-name": "DishTV EPG Generator", "generator-info-url": "" }) for channel_id, programs in channels_data.items(): try: if not channel_id: continue if not programs: continue channel_elem = ET.SubElement(root, "channel", {"id": str(channel_id)}) ET.SubElement(channel_elem, "display-name").text = str(channel_id) channel_count += 1 valid_programs = 0 for program in programs: try: if not isinstance(program, dict): continue required_fields = ['title', 'start', 'end'] if not all(field in program for field in required_fields): continue if not all(program[field] for field in required_fields): continue program_attrs = { "start": str(program["start"]), "stop": str(program["end"]), "channel": str(channel_id) } programme = ET.SubElement(root, "programme", program_attrs) ET.SubElement(programme, "title").text = str(program["title"]) program_count += 1 valid_programs += 1 except Exception as prog_error: print(f"Skipping invalid program on channel {channel_id}: {str(prog_error)}") continue if not valid_programs: root.remove(channel_elem) channel_count -= 1 except Exception as channel_error: print(f"Error processing channel {channel_id}: {str(channel_error)}") continue if channel_count > 0 and program_count > 0: xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() try: with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") success = True if os.path.exists(output_file) and os.path.getsize(output_file) > 0: print(f"EPG file saved to {output_file}") else: success = False print("Warning: EPG file appears to be empty") except IOError as file_error: print(f"Failed to write EPG file: {str(file_error)}") success = False else: print("Warning: No valid EPG data to generate") except Exception as e: print(f"Critical error during EPG generation: {str(e)}") success = False if not success and root is not None: fallback_file = "dishtv_epg_partial.xml" try: partial_xml = ET.tostring(root, encoding="utf-8", xml_declaration=True) with open(fallback_file, "w", encoding="utf-8") as f: f.write(parseString(partial_xml).toprettyxml()) print(f"Saved partial EPG data to {fallback_file}") except Exception as fallback_error: print(f"Failed to save partial data: {str(fallback_error)}") return success def scrape_dishtv(): channel_map = get_channel_mapping("dishtv.config.xml") target_ids = set(channel_map.keys()) print(f"Targeting {len(target_ids)} channels") driver = setup_driver() try: # Verify Chrome timezone print("Setting Chrome timezone to Asia/Kolkata...") timezone = driver.execute_script("return Intl.DateTimeFormat().resolvedOptions().timeZone") print(f"Chrome timezone: {timezone} (should be Asia/Kolkata)") # Additional verification current_time = driver.execute_script("return new Date().toString()") print(f"Browser current time: {current_time}") for attempt in range(3): if stop_flag: return None try: print(f"Loading page (attempt {attempt + 1})") driver.get("https://www.dishtv.in/channel-guide.html") WebDriverWait(driver, 30).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#channel-guide-search"))) break except Exception as e: print(f"Page load failed: {str(e)}") if attempt == 2: raise time.sleep(5) channels_data, found_channels = process_channels_by_search(driver, channel_map) missing = target_ids - found_channels if missing: print(f"\nWarning: Could not find {len(missing)} channels:") for channel_id in missing: print(f"- {channel_map[channel_id]} ({channel_id})") if channels_data: generate_epg(channels_data) print(f"\nGenerated EPG with {len(channels_data)} channels and {sum(len(p) for p in channels_data.values())} programs") return channels_data except Exception as e: print(f"Error during scraping: {str(e)}") return None finally: if not stop_flag: try: driver.quit() except: pass if __name__ == "__main__": print("DishTV EPG Scraper (Search + Horizontal Scrolling)") print("=================================================") try: result = scrape_dishtv() if result: print("\nOperation completed successfully!") else: print("\nOperation completed with some channels missing") except KeyboardInterrupt: print("\nOperation cancelled by user") except Exception as e: print(f"\nError: {str(e)}") timer_thread = threading.Thread(target=countdown_timer, args=(60,)) timer_thread.daemon = True timer_thread.start() try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass