#!/usr/bin/env python3 """ TVTick EPG Scraper - Improved Version Scrapes program schedules from tvtick.com and generates XMLTV EPG format """ import requests from bs4 import BeautifulSoup import xml.etree.ElementTree as ET from xml.dom.minidom import parseString from datetime import datetime, timedelta import pytz from pathlib import Path import time import re import threading import sys import signal import random from time import sleep import os # Global flag for Ctrl+C handling stop_flag = False # Configuration constants MAX_RETRIES = 3 INITIAL_RETRY_DELAY = 5 MAX_RETRY_DELAY = 30 REQUEST_TIMEOUT = 30 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' DEFAULT_PROGRAM_DURATION = timedelta(minutes=30) # Default duration when end time can't be determined def signal_handler(sig, frame): """Handle Ctrl+C interrupt gracefully""" global stop_flag print("\nCtrl+C detected. Stopping gracefully...") stop_flag = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) def get_channel_mapping(): """Get channel mapping from config file with URL name fixes""" config_path = Path(__file__).parent / "tempest_config" / "tvtick.config.xml" try: if not config_path.exists(): raise FileNotFoundError(f"Config file not found at: {config_path}") tree = ET.parse(config_path) channel_map = {} for channel in tree.findall(".//channel"): site_id = channel.get("site_id") display_name = channel.text.strip() if channel.text else channel.get("xmltv_id") # Special URL name handling for specific channels url_name = display_name.lower().replace(' ', '-') url_name = re.sub(r'[^a-z0-9-]', '', url_name) # Manual fixes for known channel URL variations if site_id == "20": # andpictures url_name = "and-pictures" elif site_id == "8": # And Pictures HD url_name = "and-pictures-hd" elif site_id == "1377": # andXplor HD url_name = "and-xplor-hd" channel_map[site_id] = { 'xmltv_id': channel.get("xmltv_id"), 'display_name': display_name, 'url_name': url_name } return channel_map except Exception as e: print(f"Error loading channel mapping: {str(e)}") return {} def parse_datetime(date_str, time_str): """Parse combined date and time string into datetime object""" try: # Clean and normalize the strings date_str = re.sub(r'\s+', ' ', date_str.strip()) time_str = re.sub(r'\s+', ' ', time_str.strip()) # Handle special case where time starts with 00: if time_str.startswith('00:'): time_str = '12' + time_str[2:] # Parse the date (handles both "Mar" and "March" formats) india_tz = pytz.timezone('Asia/Kolkata') try: date_obj = datetime.strptime(date_str, "%d %b %Y") except ValueError: try: date_obj = datetime.strptime(date_str, "%d %B %Y") except ValueError: # Fallback to today's date if parsing fails date_obj = datetime.now() # Parse the time (e.g., "12:46 AM") try: time_obj = datetime.strptime(time_str, "%I:%M %p").time() except ValueError: # Fallback to current time if parsing fails time_obj = datetime.now().time() # Combine date and time combined = datetime.combine(date_obj.date(), time_obj) return india_tz.localize(combined) except Exception as e: print(f"DateTime parsing error for '{date_str} {time_str}': {e}") return None def make_request_with_retry(url, headers, max_retries=MAX_RETRIES): """Make HTTP request with retry logic and enhanced headers""" retry_delay = INITIAL_RETRY_DELAY enhanced_headers = headers.copy() enhanced_headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', 'Upgrade-Insecure-Requests': '1', }) for attempt in range(max_retries + 1): try: if stop_flag: return None if attempt > 0: actual_delay = retry_delay + random.uniform(-2, 2) print(f"Attempt {attempt + 1}/{max_retries + 1}. Waiting {actual_delay:.1f} seconds before retry...") sleep(actual_delay) retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY) response = requests.get(url, headers=enhanced_headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() # Check if we got a valid HTML page if 'text/html' in response.headers.get('Content-Type', '') and \ len(response.text) > 1000: # Basic check for valid content return response else: raise requests.exceptions.RequestException("Invalid response content") except requests.exceptions.HTTPError as e: if e.response.status_code == 404: print(f"404 Not Found for {url}") return None # No point retrying for 404 if attempt < max_retries: print(f"HTTP error ({e.response.status_code}) for {url}. Will retry...") continue raise except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e: if attempt < max_retries: print(f"Request error ({type(e).__name__}) for {url}. Will retry...") continue raise return None def scrape_channel_schedule(channel_id, channel_info): """Scrape schedule for a single channel from tvtick.com""" try: if stop_flag: return [] headers = {'User-Agent': USER_AGENT} base_urls = [ f"https://tvtick.com/{channel_info['url_name']}/Channel/{channel_id}/Schedule/Today", f"https://tvtick.com/{channel_info['url_name']}/Channel/{channel_id}/Schedule" ] programs = [] current_date = datetime.now().strftime("%d %b %Y") for url in base_urls: print(f"Fetching: {url}") response = make_request_with_retry(url, headers) if not response: print(f"Failed to fetch data from {url}") continue soup = BeautifulSoup(response.text, 'html.parser') # Find all program items - these are in divs with class 'prog-list' program_items = soup.find_all('div', class_='prog-list') for item in program_items: try: # Extract time from the header time_el = item.find('h2', class_='header') if not time_el: continue time_str = time_el.find('span').get_text(strip=True) # Extract title and other details title_el = item.find('h4', class_='text-center') if not title_el: continue title = title_el.get_text(strip=True) # Extract genre if available genre_el = item.find('div', class_='text-center') genre = "" if genre_el: genre_tags = genre_el.find_all('a', class_='badge bg-info text-dark btnSpaced') genre = ", ".join([g.get_text(strip=True).replace('#', '') for g in genre_tags]) # Extract description desc_el = item.find('p', class_='text-white lh-sm') description = desc_el.get_text(strip=True) if desc_el else "" # Parse the datetime start_time = parse_datetime(current_date, time_str) if not start_time: continue programs.append({ "title": title, "start": start_time, "genre": genre, "description": description }) except Exception as e: print(f"Error parsing program item: {e}") continue # Remove duplicate programs (same title and start time) unique_programs = [] seen = set() for program in programs: key = (program["title"], program["start"]) if key not in seen: seen.add(key) unique_programs.append(program) # Process programs to calculate end times if unique_programs: # Sort programs by start time unique_programs.sort(key=lambda x: x["start"]) processed_programs = [] for i in range(len(unique_programs)): start = unique_programs[i]["start"] if i < len(unique_programs) - 1: end = unique_programs[i+1]["start"] else: end = start + DEFAULT_PROGRAM_DURATION # Convert to GMT for EPG format start_gmt = start.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") end_gmt = end.astimezone(pytz.utc).strftime("%Y%m%d%H%M%S +0000") processed_programs.append({ "title": unique_programs[i]["title"], "start": start_gmt, "end": end_gmt, "genre": unique_programs[i]["genre"], "description": unique_programs[i].get("description", "") }) return processed_programs else: print(f"No programs found in the schedule for {channel_info['display_name']}") return [] except Exception as e: print(f"Error scraping {channel_info['display_name']}: {str(e)}") return [] def generate_epg(channels_data): """Generate XMLTV format EPG file""" if not channels_data: print("Warning: No channel data provided for EPG generation") return False print("\nStarting EPG XML generation...") success = False channel_count = 0 program_count = 0 root = None output_dir = Path(__file__).parent / "tempest_config" / "epg" output_file = output_dir / "tvtick_epg.xml" try: output_dir.mkdir(parents=True, exist_ok=True) root = ET.Element("tv", { "generator-info-name": "TVTick EPG Generator", "generator-info-url": "" }) for channel_id, channel_info in channels_data.items(): try: programs = channel_info['programs'] if not programs: continue channel_elem = ET.SubElement(root, "channel", {"id": channel_info['xmltv_id']}) ET.SubElement(channel_elem, "display-name").text = channel_info['display_name'] channel_count += 1 for program in programs: try: programme = ET.SubElement(root, "programme", { "start": program["start"], "stop": program["end"], "channel": channel_info['xmltv_id'] }) ET.SubElement(programme, "title", {'lang': 'en'}).text = program["title"] if program.get("genre"): ET.SubElement(programme, "category", {'lang': 'en'}).text = program["genre"] program_count += 1 except Exception as e: continue except Exception as e: print(f"Error processing channel {channel_info['xmltv_id']}: {str(e)}") continue if channel_count > 0 and program_count > 0: xml_string = ET.tostring(root, encoding="utf-8", xml_declaration=True) pretty_xml = parseString(xml_string).toprettyxml() with open(output_file, "w", encoding="utf-8") as f: f.write(pretty_xml) print(f"Successfully generated EPG with {channel_count} channels and {program_count} programs") print(f"EPG file saved to {output_file.absolute()}") success = True else: print("Warning: No valid programs found to generate EPG") except Exception as e: print(f"Error during EPG generation: {str(e)}") success = False return success def countdown_timer(seconds): """Display countdown timer before automatic exit""" for i in range(seconds, 0, -1): if stop_flag: return print(f"\rClosing in {i} seconds...", end="") time.sleep(1) print("\nClosing program automatically...") os._exit(0) def scrape_tvtick(): """Main scraping function""" channel_map = get_channel_mapping() if not channel_map: print("Error: No channel mappings found") return None channels_data = {} for site_id, channel_info in channel_map.items(): if stop_flag: break print(f"\nScraping {channel_info['display_name']}...") programs = scrape_channel_schedule(site_id, channel_info) if programs: channels_data[site_id] = { 'xmltv_id': channel_info['xmltv_id'], 'display_name': channel_info['display_name'], 'programs': programs } print(f"Found {len(programs)} programs for {channel_info['display_name']}") else: print(f"No programs found for {channel_info['display_name']}") if channels_data and not stop_flag: generate_epg(channels_data) return channels_data if __name__ == "__main__": print("TVTick EPG Scraper - Improved Version") print("====================================") print(f"Configuration: Max Retries={MAX_RETRIES}, Initial Delay={INITIAL_RETRY_DELAY}s") print(f"Max Delay={MAX_RETRY_DELAY}s, Timeout={REQUEST_TIMEOUT}s\n") start_time = time.time() result = scrape_tvtick() elapsed = time.time() - start_time if result: print(f"\nOperation completed successfully in {elapsed:.2f} seconds!") print(f"Scraped {len(result)} channels") else: print("\nOperation completed with errors") # Start countdown timer in background timer_thread = threading.Thread(target=countdown_timer, args=(10,)) timer_thread.daemon = True timer_thread.start() try: input("\nPress Enter to exit immediately or wait for automatic closure...") os._exit(0) except: pass