#!/usr/bin/env python3 """ Unified SonyMax2 + SonyMax HD + SET Asia EPG Scraper with Proper XML Formatting """ from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup import xml.etree.ElementTree as ET import xml.sax.saxutils as saxutils # ADDED for proper XML escaping from datetime import datetime, timedelta import pytz from pathlib import Path import time import threading import sys import signal import requests import os import urllib.parse stop_flag = False REQUEST_TIMEOUT = 30 DEFAULT_PROGRAM_DURATION = timedelta(hours=1) LAST_PROGRAM_DURATION = timedelta(hours=4) INDIA_TZ = pytz.timezone('Asia/Kolkata') UTC_TZ = pytz.utc def signal_handler(sig, frame): global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def countdown_timer(seconds): for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def get_channel_mapping(): return { "sonymax2": { "xmltv_id": "Sony Max 2.in", "display_name": "Sony Max 2 (India)", "site_url": "https://www.sonymax2.tv/en_in/tv-guide" }, "sonymaxhd": { "xmltv_id": "Sony Max HD.in", "display_name": "Sony Max HD (India)", "site_url": "https://www.sonymax.tv/en_in/schedule" }, "setasia": { "xmltv_id": "SET.us", "display_name": "Sony Entertainment Television Asia", "site_url": "https://www.ontvtonight.com/ca/guide/listings/channel/69023680/sony-entertainment-television-asia-set-asia-international.html" } } def parse_time(time_str, date_obj): try: time_str = time_str.strip().upper() if "AM" in time_str or "PM" in time_str: time_part = time_str.replace("AM", "").replace("PM", "").strip() if ":" not in time_part: time_part += ":00" time_format = "%I:%M %p" else: time_format = "%H:%M" time_obj = datetime.strptime(time_str, time_format).time() combined = datetime.combine(date_obj.date(), time_obj) return INDIA_TZ.localize(combined) except Exception as e: print(f"Time parsing error for '{time_str}': {e}") return None def scrape_sonymax2_schedule(): options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') all_programs = [] seen = set() try: driver = webdriver.Chrome(options=options) driver.get("https://www.sonymax2.tv/en_in/tv-guide") time.sleep(5) today = datetime.now(INDIA_TZ) tomorrow = today + timedelta(days=1) all_programs += get_programs_for_day(driver, today, seen) try: tomorrow_tab = driver.find_element(By.XPATH, "//li[@data-id and not(contains(@class, 'active'))]") tomorrow_tab.click() time.sleep(3) all_programs += get_programs_for_day(driver, tomorrow, seen) except Exception as e: print(f"Error switching to tomorrow's tab: {e}") except Exception as e: print(f"Error during SonyMax2 scraping: {e}") finally: driver.quit() return all_programs def get_programs_for_day(driver, date_obj, seen_programs): programs = [] try: WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "schedule"))) soup = BeautifulSoup(driver.page_source, "html.parser") container = soup.find("div", id="schedule") for item in container.find_all("li"): try: time_element = item.find("h4") if not time_element: continue time_text = ''.join([t for t in time_element.contents if isinstance(t, str)]).strip() title = time_element.find("span").get_text(strip=True) if time_element.find("span") else "N/A" key = f"{date_obj.date()}-{time_text}-{title}" if key in seen_programs: continue seen_programs.add(key) start = parse_time(time_text, date_obj) if not start: continue print(f"Time: {time_text} | Title: {title}") programs.append({"title": title, "start": start, "genre": "Entertainment"}) except Exception as e: print(f"Error parsing item: {e}") except Exception as e: print(f"Error loading schedule: {e}") return programs def scrape_sonymaxhd_schedule(): options = Options() options.add_argument('--headless') options.add_argument('--disable-gpu') shows = [] try: driver = webdriver.Chrome(options=options) driver.get("https://www.sonymax.tv/en_in/schedule") time.sleep(10) WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.ID, "liSchedule"))) soup = BeautifulSoup(driver.page_source, "html.parser") container = soup.find("ul", id="liSchedule") today = datetime.now(INDIA_TZ) for li in container.find_all("li"): time_tag = li.find("h4") if not time_tag: continue time_str = ''.join([t for t in time_tag.contents if isinstance(t, str)]).strip() title = time_tag.find("span").text.strip() if time_tag.find("span") else "N/A" desc = li.find("p").text.strip() if li.find("p") else "Entertainment" start = parse_time(time_str, today) if not start: continue shows.append({"title": title, "start": start, "genre": desc}) except Exception as e: print(f"Error during SonyMax HD scraping: {e}") finally: driver.quit() return shows def scrape_setasia_schedule(): headers = {"User-Agent": "Mozilla/5.0"} base_url = "https://www.ontvtonight.com/ca/guide/listings/channel/69023680/sony-entertainment-television-asia-set-asia-international.html?dt=" eastern = pytz.timezone("America/Toronto") gmt = pytz.timezone("GMT") all_programs = [] # Fetch today's full schedule for offset in range(2): # 0 = today, 1 = tomorrow date = datetime.now() + timedelta(days=offset) formatted_date = date.strftime("%Y-%m-%d") url = base_url + formatted_date print(f"\nGrabbing SET Asia schedule for {formatted_date}...") response = requests.get(url, headers=headers) soup = BeautifulSoup(response.content, "html.parser") table = soup.find("table", class_="table table-hover") if table: rows = table.find("tbody").find_all("tr") count = 0 # to limit tomorrow's programs for row in rows: cells = row.find_all("td") if len(cells) < 2: continue local_time_str = cells[0].get_text(strip=True) title_cell = cells[1] title = title_cell.get_text(" ", strip=True) # Extract detail URL if available detail_link = None a_tag = title_cell.find("a") if a_tag and a_tag.get("href"): detail_link = a_tag["href"] if not detail_link.startswith("http"): detail_link = urllib.parse.urljoin("https://www.ontvtonight.com", detail_link) if "(" in title and title.strip().endswith(")"): title = title.rsplit("(", 1)[0].strip() dt_str = f"{formatted_date} {local_time_str}" try: dt_local = datetime.strptime(dt_str, "%Y-%m-%d %I:%M %p") dt_local = eastern.localize(dt_local) dt_gmt = dt_local.astimezone(gmt) except Exception: continue # Get program details if link exists desc = "No description available" if detail_link: try: details = get_program_details(detail_link) desc = details.get('description', "No description available") except Exception as e: print(f"Error getting details for {title}: {e}") desc = "No description available" all_programs.append({ "title": title, "start": dt_gmt, "genre": "Entertainment", "desc": desc }) if offset == 1: # If fetching tomorrow's schedule count += 1 if count >= 3: # Only grab first 3 programs break return all_programs def get_program_details(detail_url): import requests from bs4 import BeautifulSoup headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } try: response = requests.get(detail_url, headers=headers, timeout=10) if response.status_code != 200: print(f"Warning: Failed to fetch details from {detail_url} (Status {response.status_code})") return { 'description': "No description available", 'cast': [], 'crew': [] } soup = BeautifulSoup(response.content, "html.parser") # Default details details = { 'description': "No description available", 'cast': [], 'crew': [] } # Try to find Description about_header = soup.find("h3", class_="thin", string="About this Broadcast") if about_header: about_div = about_header.find_next("div", class_="tvbody") if about_div: desc_para = about_div.find("p") if desc_para: details['description'] = desc_para.get_text(strip=True) # Try to find Cast cast_header = soup.find("h3", class_="thin", string="Cast & Crew") if cast_header: cast_div = cast_header.find_next("div", class_="tvbody") if cast_div: cast_items = cast_div.find_all("div", class_="single-cast") for item in cast_items: head = item.find("div", class_="single-cast-head") if not head: continue strong_tag = head.find("strong") if strong_tag: actor_name = strong_tag.get_text(strip=True) role_info = head.get_text(strip=True).replace(actor_name, "").strip() details['cast'].append({ 'name': actor_name, 'role': role_info if role_info else None }) return details except Exception as e: print(f"Error fetching program details from {detail_url}: {e}") return { 'description': "No description available", 'cast': [], 'crew': [] } def process_program_times(programs): programs.sort(key=lambda x: x["start"]) result = [] for i, prog in enumerate(programs): end = programs[i+1]["start"] if i + 1 < len(programs) else prog["start"] + LAST_PROGRAM_DURATION if end < prog["start"]: end += timedelta(days=1) result.append({ "title": prog["title"], "start": prog["start"].astimezone(UTC_TZ).strftime("%Y%m%d%H%M%S +0000"), "end": end.astimezone(UTC_TZ).strftime("%Y%m%d%H%M%S +0000"), "genre": prog["genre"], "desc": prog.get("desc", "No description available") }) return result def generate_epg(channels_data): print("\nGenerating EPG...") output_dir = Path(__file__).parent / "tempest_config" / "epg" output_file = output_dir / "sony_epg.xml" output_dir.mkdir(parents=True, exist_ok=True) xml_content = '\n' xml_content += '\n' for channel in channels_data.values(): channel_line = f' ' channel_line += f'{saxutils.escape(channel["display_name"])}' channel_line += '' xml_content += channel_line + '\n' for channel in channels_data.values(): for prog in channel['programs']: if not all(k in prog for k in ['title', 'start', 'end']): continue programme_line = f' ' programme_line += f'{saxutils.escape(prog["title"])}' programme_line += f'{saxutils.escape(prog["desc"])}' programme_line += f'{saxutils.escape(prog["genre"])}' programme_line += '' xml_content += programme_line + '\n' xml_content += '' with open(output_file, "w", encoding="utf-8") as f: f.write(xml_content) print(f"✅ EPG written to {output_file}") def scrape_all(): channel_map = get_channel_mapping() result = {} print("\nScraping Sony Max 2...") max2_programs = scrape_sonymax2_schedule() result['sonymax2'] = { **channel_map['sonymax2'], 'programs': process_program_times(max2_programs) } print("\nScraping Sony Max HD...") maxhd_programs = scrape_sonymaxhd_schedule() result['sonymaxhd'] = { **channel_map['sonymaxhd'], 'programs': process_program_times(maxhd_programs) } print("\nScraping SET Asia...") setasia_programs = scrape_setasia_schedule() result['setasia'] = { **channel_map['setasia'], 'programs': process_program_times(setasia_programs) } generate_epg(result) if __name__ == "__main__": print("\nUnified Sony Max EPG Scraper") print("==============================") start = time.time() scrape_all() print(f"\nFinished in {time.time() - start:.2f} seconds") threading.Thread(target=countdown_timer, args=(10,), daemon=True).start() try: input("\nPress Enter to exit immediately or wait...") os._exit(0) except: pass