from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.keys import Keys import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, date from datetime import timedelta import time import os import signal import sys import threading import pytz from pathlib import Path # Global flag for Ctrl+C handling stop_flag = False def signal_handler(sig, frame): global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def setup_driver(): options = webdriver.ChromeOptions() options.add_argument("--timezone=Asia/Kolkata") options.add_experimental_option("prefs", { "timezone": "Asia/Kolkata", "intl.accept_languages": "en-IN", }) options.add_argument("--disable-notifications") options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--window-size=1920,1080") options.add_argument("--disable-dev-shm-usage") options.add_argument("--no-sandbox") options.add_argument("--log-level=3") options.add_experimental_option('excludeSwitches', ['enable-logging']) driver = webdriver.Chrome(service=ChromeService(), options=options) driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "Asia/Kolkata"}) driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-IN"}) return driver def load_cookies(driver): cookies = [ {'name': 'showOnBoarding', 'value': 'false', 'domain': 'watch.tataplay.com', 'path': '/'}, {'name': 'hide_splash', 'value': 'true', 'domain': 'watch.tataplay.com', 'path': '/'}, {'name': 'TYPE-OF-USER', 'value': 'REGISTERED', 'domain': '.tataplay.com', 'path': '/'}, {'name': 'APP-LANGUAGE', 'value': 'English', 'domain': '.tataplay.com', 'path': '/'}, {'name': 'deviceRegLimit', 'value': '10', 'domain': 'watch.tataplay.com', 'path': '/'}, {'name': 'device_id', 'value': '56a9bb511b2e25a154589fc687e84e8e', 'domain': 'watch.tataplay.com', 'path': '/'}, {'name': 'tataplay_session', 'value': '1', 'domain': '.tataplay.com', 'path': '/'}, ] driver.get("https://watch.tataplay.com") time.sleep(1) driver.delete_all_cookies() for cookie in cookies: try: driver.add_cookie(cookie) except Exception as e: print(f"Could not add cookie {cookie['name']}: {str(e)}") print("✓ Cookies loaded") def dismiss_popups(driver): max_attempts = 3 for attempt in range(max_attempts): try: # Star channels popup try: WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".popupOuter:not(.side-menu-language)")) ) driver.find_element(By.CSS_SELECTOR, ".CloseBtn").click() print("✓ Closed Star popup") time.sleep(1) except: pass # Language popup try: WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".popupOuter.side-menu-language")) ) driver.find_element(By.CSS_SELECTOR, ".ContinueBtn").click() print("✓ Closed Language popup") time.sleep(1) except: pass # Onboarding popup try: WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".popupSec.onboardingSec")) ) done_btn = WebDriverWait(driver, 3).until( EC.element_to_be_clickable((By.XPATH, "//a[contains(@class, 'skp_btn') and contains(text(), 'DONE')]")) ) driver.execute_script("arguments[0].scrollIntoView();", done_btn) time.sleep(0.5) driver.execute_script("arguments[0].click();", done_btn) print("✓ Closed Onboarding popup") time.sleep(1) except Exception as e: pass break except Exception as e: if attempt == max_attempts - 1: print("⚠ Could not dismiss all popups") time.sleep(1) def initialize_session(driver): load_cookies(driver) driver.get("https://watch.tataplay.com/my-box?pageType=my-box") time.sleep(3) WebDriverWait(driver, 30).until( lambda d: d.execute_script("return document.readyState") == "complete") dismiss_popups(driver) WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".header-search-icon"))) print("✓ Ready for channel search") def countdown_timer(seconds): for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(): # Define path to config file in tempest_config subdirectory config_path = Path(__file__).parent / "tempest_config" / "tata.config.xml" try: if not config_path.exists(): raise FileNotFoundError(f"Config file not found at: {config_path}") tree = ET.parse(config_path) return {channel.get("site_id"): channel.get("xmltv_id") for channel in tree.findall(".//channel")} except Exception as e: print(f"Error loading channel mapping: {str(e)}") return {} def convert_time_format(time_str): try: if not time_str: return None, None, None # Clean up the time string time_str = time_str.strip() time_str = time_str.replace(" ", " ") # Handle HTML spaces time_str = time_str.replace(" ", " ") # Replace double spaces # Handle different time separators if " - " in time_str: separator = " - " elif "-" in time_str: separator = "-" else: return None, None, None start_str, end_str = [s.strip() for s in time_str.split(separator)] # Get today's date in India timezone india_tz = pytz.timezone('Asia/Kolkata') today = datetime.now(india_tz) today_date = today.strftime("%Y%m%d") # Parse start time try: start = india_tz.localize(datetime.strptime(f"{today_date} {start_str}", "%Y%m%d %I:%M %p")) except ValueError: # Try without minutes if format is like "11 pm" start = india_tz.localize(datetime.strptime(f"{today_date} {start_str}", "%Y%m%d %I %p")) # Parse end time try: end = india_tz.localize(datetime.strptime(f"{today_date} {end_str}", "%Y%m%d %I:%M %p")) except ValueError: # Try without minutes if format is like "11 pm" end = india_tz.localize(datetime.strptime(f"{today_date} {end_str}", "%Y%m%d %I %p")) # Handle overnight programs (end time is earlier than start time) if end < start: end = end + timedelta(days=1) # Convert to GMT for EPG format start_gmt = start.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") end_gmt = end.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") # Return both IST (for display) and GMT (for XML) start_ist = start.strftime("%I:%M %p") end_ist = end.strftime("%I:%M %p") return start_ist, end_ist, (start_gmt, end_gmt) except Exception as e: print(f"Time conversion error for '{time_str}': {str(e)}") return None, None, None def safe_get_text(element, selector): try: elem = element.find_element(By.CSS_SELECTOR, selector) return elem.text if elem else None except: return None def search_channel(driver, channel_num): try: # Clear any existing search try: search_box = driver.find_element(By.CSS_SELECTOR, "#searchTxt") search_box.clear() search_box.send_keys(Keys.ESCAPE) time.sleep(1) except: pass # Click search icon with retries for attempt in range(3): try: search_icon = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CSS_SELECTOR, "img.header-search-icon"))) driver.execute_script("arguments[0].click();", search_icon) time.sleep(2) break except Exception as e: if attempt == 2: raise print(f"Retrying search icon click for channel {channel_num}...") time.sleep(2) # Enter channel number search_box = WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CSS_SELECTOR, "#searchTxt"))) search_box.clear() for char in str(channel_num): search_box.send_keys(char) time.sleep(0.1) time.sleep(2) # Wait for and click first result first_result = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CSS_SELECTOR, ".auto_search_outer .search-list:first-child a"))) driver.execute_script("arguments[0].click();", first_result) time.sleep(3) # Handle Best Match section if it exists try: best_match = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.XPATH, "//h3[contains(., 'Best Match')]"))) channel_xpath = f"//div[contains(@class, 'channel-no') and contains(., '{channel_num}')]/ancestor::a" best_match_channel = WebDriverWait(driver, 5).until( EC.element_to_be_clickable((By.XPATH, channel_xpath))) driver.execute_script("arguments[0].click();", best_match_channel) time.sleep(3) except: pass # Final verification WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".channel-schedule-wrapper"))) return True except Exception as e: print(f"Error searching for channel {channel_num}: {str(e)}") # Try to clear search and continue try: search_box = driver.find_element(By.CSS_SELECTOR, "#searchTxt") search_box.clear() search_box.send_keys(Keys.ESCAPE) time.sleep(1) except: pass return False def get_current_programs(driver): try: programs = [] # First try to get from channel description container (with "On Now") try: channel_desc = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".channel-description-container"))) title = safe_get_text(channel_desc, ".content-title") time_slot = safe_get_text(channel_desc, ".content-time-duration") if title and time_slot: # Clean up time slot time_slot = time_slot.replace(" ", " ").strip() start_ist, end_ist, (start_gmt, end_gmt) = convert_time_format(time_slot) if start_gmt and end_gmt: programs.append({ "title": title, "start": start_gmt, "end": end_gmt, "start_ist": start_ist, "end_ist": end_ist }) return programs except: pass # If not found in description, look for "Live" programs in schedule try: live_cards = driver.find_elements(By.CSS_SELECTOR, ".live-text-wrapper") for card in live_cards: try: # Get the parent card element program_card = card.find_element(By.XPATH, "./ancestor::div[contains(@class, 'search-landing-card')]") title = safe_get_text(program_card, ".search-result-box-title") time_slot = safe_get_text(program_card, ".channel-info") if title and time_slot: # Clean up time slot time_slot = time_slot.strip() if "|" in time_slot: # Handle cases with dates time_slot = time_slot.split("|")[-1].strip() start_ist, end_ist, (start_gmt, end_gmt) = convert_time_format(time_slot) if start_gmt and end_gmt: programs.append({ "title": title, "start": start_gmt, "end": end_gmt, "start_ist": start_ist, "end_ist": end_ist }) except: continue except: pass return programs except Exception as e: print(f"Error getting current programs: {str(e)}") return [] def get_full_schedule(driver): try: programs = [] # Click on "View Full Schedule" button try: schedule_btn = WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.CSS_SELECTOR, ".full-scehdule-btn"))) driver.execute_script("arguments[0].click();", schedule_btn) time.sleep(3) except Exception as e: print(f"Couldn't click View Full Schedule button: {str(e)}") return programs # Get all time slot buttons time_slots = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".time-slot-wrapper li"))) # Process each time slot for slot in time_slots: try: # Click the time slot driver.execute_script("arguments[0].click();", slot) time.sleep(2) # Wait for programs to load # Get all program cards in this time slot program_cards = driver.find_elements(By.CSS_SELECTOR, ".card-section-wrapper > .search-landing-card") for card in program_cards: try: # Get program title title = safe_get_text(card, ".search-list-box-title") # Get time slot - look in multiple possible locations time_slot = safe_get_text(card, ".time-slot span") if title and time_slot: # Clean up time slot text time_slot = time_slot.strip() if "-" in time_slot: # Handle cases like "01:34 am-02:30 am" start_time, end_time = time_slot.split("-") time_slot = f"{start_time.strip()} - {end_time.strip()}" start_ist, end_ist, (start_gmt, end_gmt) = convert_time_format(time_slot) if start_gmt and end_gmt: programs.append({ "title": title, "start": start_gmt, "end": end_gmt, "start_ist": start_ist, "end_ist": end_ist }) except Exception as e: print(f"Skipping a program due to error: {str(e)}") continue except Exception as e: print(f"Error processing time slot: {str(e)}") continue # Close the schedule view (if needed) try: close_btn = driver.find_element(By.CSS_SELECTOR, ".close-schedule") close_btn.click() time.sleep(1) except: pass return programs except Exception as e: print(f"Error getting full schedule: {str(e)}") return [] def process_channel(driver, channel_num, channel_map): try: if not search_channel(driver, channel_num): print(f"Channel {channel_num} not found") return None # First get current programs current_programs = get_current_programs(driver) # Then get full schedule (which includes upcoming programs) full_schedule = get_full_schedule(driver) # Combine both lists all_programs = current_programs + full_schedule if full_schedule else current_programs if all_programs: # Remove duplicates unique_programs = [] seen = set() for prog in all_programs: key = (prog["title"], prog["start"], prog["end"]) if key not in seen: seen.add(key) unique_programs.append(prog) # Sort programs by start time unique_programs.sort(key=lambda x: x["start"]) # Fix missing/incorrect end times fixed_programs = [] for i, prog in enumerate(unique_programs): try: # If end time is missing or invalid, calculate it if not prog.get("end"): if i < len(unique_programs) - 1: # Use start time of next program as end time prog["end"] = unique_programs[i+1]["start"] else: # Last program of the day - add 4 hours start_dt = datetime.strptime(prog["start"].split()[0], "%Y%m%d%H%M%S") end_dt = start_dt + timedelta(hours=4) prog["end"] = end_dt.strftime("%Y%m%d%H%M%S +0000") # Ensure end time is after start time start_dt = datetime.strptime(prog["start"].split()[0], "%Y%m%d%H%M%S") end_dt = datetime.strptime(prog["end"].split()[0], "%Y%m%d%H%M%S") if end_dt <= start_dt: # Add 1 hour if end is before start (minimum duration) end_dt = start_dt + timedelta(hours=1) prog["end"] = end_dt.strftime("%Y%m%d%H%M%S +0000") fixed_programs.append({ "title": prog["title"], "start": prog["start"], "end": prog["end"], "start_ist": prog.get("start_ist", ""), "end_ist": prog.get("end_ist", "") }) # Print each program found with IST times print(f"Found: {channel_map[channel_num]} - {prog['title']} ({prog.get('start_ist', '')} - {prog.get('end_ist', '')})") except Exception as e: print(f"Skipping program due to error: {str(e)}") continue print(f"Found {len(fixed_programs)} programs for channel {channel_num}") return { channel_map[channel_num]: fixed_programs } else: print(f"No programs found for channel {channel_num}") return None except Exception as e: print(f"Error processing channel {channel_num}: {str(e)}") return None finally: # Clear search try: search_box = driver.find_element(By.CSS_SELECTOR, "#searchTxt") search_box.clear() search_box.send_keys(Keys.ESCAPE) time.sleep(1) except: pass def generate_epg(channels_data): if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") success = False channel_count = 0 program_count = 0 root = None # Define output path (tempest_config/epg subdirectory) output_dir = Path(__file__).parent / "tempest_config" / "epg" output_file = output_dir / "tataplay_epg.xml" try: # Create directory if it doesn't exist output_dir.mkdir(parents=True, exist_ok=True) root = ET.Element("tv", { "generator-info-name": "TataPlay EPG Generator", "generator-info-url": "" }) for channel_id, programs in channels_data.items(): try: if not channel_id: continue if not programs: continue channel_elem = ET.SubElement(root, "channel", {"id": str(channel_id)}) ET.SubElement(channel_elem, "display-name").text = str(channel_id) channel_count += 1 valid_programs = 0 for program in programs: try: if not isinstance(program, dict): continue required_fields = ['title', 'start', 'end'] if not all(field in program for field in required_fields): continue if not all(program[field] for field in required_fields): continue program_attrs = { "start": str(program["start"]), "stop": str(program["end"]), "channel": str(channel_id) } programme = ET.SubElement(root, "programme", program_attrs) ET.SubElement(programme, "title").text = str(program["title"]) program_count += 1 valid_programs += 1 except Exception as prog_error: print(f"Skipping invalid program on channel {channel_id}: {str(prog_error)}") continue if not valid_programs: root.remove(channel_elem) channel_count -= 1 except Exception as channel_error: print(f"Error processing channel {channel_id}: {str(channel_error)}") continue if channel_count > 0 and program_count > 0: xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() try: with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") print(f"EPG file saved to {output_file.absolute()}") success = True if os.path.exists(output_file) and os.path.getsize(output_file) > 0: print("EPG file verification successful") else: success = False print("Warning: EPG file appears to be empty") except IOError as file_error: print(f"Failed to write EPG file: {str(file_error)}") success = False else: print("Warning: No valid EPG data to generate") except Exception as e: print(f"Critical error during EPG generation: {str(e)}") success = False if not success and root is not None: fallback_file = output_dir / "tataplay_epg_partial.xml" try: partial_xml = ET.tostring(root, encoding="utf-8", xml_declaration=True) with open(fallback_file, "w", encoding="utf-8") as f: f.write(parseString(partial_xml).toprettyxml()) print(f"Saved partial EPG data to {fallback_file}") except Exception as fallback_error: print(f"Failed to save partial data: {str(fallback_error)}") return success def scrape_tataplay(): channel_map = get_channel_mapping() target_ids = set(channel_map.keys()) print(f"Targeting {len(target_ids)} channels") driver = setup_driver() try: initialize_session(driver) channels_data = {} found_channels = set() for channel_num in channel_map.keys(): if stop_flag: break print(f"\nProcessing channel #{channel_num} ({channel_map[channel_num]})...") channel_data = process_channel(driver, channel_num, channel_map) if channel_data: channels_data.update(channel_data) found_channels.add(channel_num) missing = target_ids - found_channels if missing: print(f"\nWarning: Could not find {len(missing)} channels:") for channel_id in missing: print(f"- {channel_map[channel_id]} ({channel_id})") if channels_data: generate_epg(channels_data) print(f"\nGenerated EPG with {len(channels_data)} channels and {sum(len(p) for p in channels_data.values())} programs") return channels_data except Exception as e: print(f"Error during scraping: {str(e)}") return None finally: if not stop_flag: try: driver.quit() except: pass if __name__ == "__main__": print("TataPlay EPG Scraper") print("=====================") try: result = scrape_tataplay() if result: print("\nOperation completed successfully!") else: print("\nOperation completed with some channels missing") except KeyboardInterrupt: print("\nOperation cancelled by user") except Exception as e: print(f"\nError: {str(e)}") timer_thread = threading.Thread(target=countdown_timer, args=(10,)) timer_thread.daemon = True timer_thread.start() try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass