import os import sys import time import requests import subprocess import gzip import io from datetime import datetime from PyQt5.QtWidgets import ( QApplication, QMainWindow, QVBoxLayout, QListWidget, QWidget, QHBoxLayout, QListWidgetItem, QLabel, QSplitter, QFrame, QPushButton, QSlider, QSizePolicy ) from PyQt5.QtCore import Qt, QTimer, QEvent, QTime, QUrl, QThread, pyqtSignal from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest import vlc import xml.etree.ElementTree as ET class VideoFrame(QFrame): def __init__(self, parent=None): super().__init__(parent) self.setStyleSheet("background-color: black;") class VLCPlayer: def __init__(self, video_frame): self.instance = vlc.Instance() self.media_player = self.instance.media_player_new() if sys.platform == "win32": self.media_player.set_hwnd(int(video_frame.winId())) else: self.media_player.set_xwindow(int(video_frame.winId())) def play_stream(self, url): media = self.instance.media_new(url) media.add_option(":network-caching=1000") self.media_player.set_media(media) self.media_player.play() self.media_player.audio_set_volume(70) def stop(self): self.media_player.stop() def release(self): self.media_player.release() self.instance.release() class EPGDownloader(QThread): epg_downloaded = pyqtSignal(bytes) error_occurred = pyqtSignal(str) def __init__(self, epg_url): super().__init__() self.epg_url = epg_url def run(self): try: response = requests.get(self.epg_url, timeout=10) response.raise_for_status() # Check if content is gzipped if response.headers.get('Content-Encoding') == 'gzip': self.epg_downloaded.emit(response.content) else: # Try to decompress anyway (some servers don't set proper headers) try: decompressed = gzip.decompress(response.content) self.epg_downloaded.emit(decompressed) except: self.epg_downloaded.emit(response.content) except Exception as e: self.error_occurred.emit(f"EPG download failed: {str(e)}") class IPTVWindow(QMainWindow): def __init__(self, php_dir): super().__init__() self.setWindowTitle("IPTV Player with EPG [DEBUG MODE]") self.setGeometry(100, 100, 1280, 720) self.php_dir = php_dir self._is_fullscreen = False self._normal_geometry = None self._normal_flags = None self.current_channel_epg = None self.epg_url = None self.debug_log = [] # Network manager for EPG requests self.network_manager = QNetworkAccessManager() self.network_manager.finished.connect(self.handle_epg_response) self.init_ui() self.installEventFilter(self) self.download_playlist() def log_debug(self, message): """Add debug message to log and print to console""" timestamp = datetime.now().strftime("%H:%M:%S") full_message = f"[{timestamp}] {message}" self.debug_log.append(full_message) print(full_message) # Update debug info in UI if available if hasattr(self, 'debug_display'): self.debug_display.addItem(full_message) # Fixed: using addItem instead of append self.debug_display.scrollToBottom() def init_ui(self): self.splitter = QSplitter(Qt.Horizontal) # Left panel with channels and debug info left_panel = QWidget() left_layout = QVBoxLayout(left_panel) self.channel_list = QListWidget() self.channel_list.itemClicked.connect(self.on_channel_clicked) left_layout.addWidget(self.channel_list) # Debug console self.debug_display = QListWidget() self.debug_display.setStyleSheet(""" background-color: #111; color: #0f0; font-family: Consolas, monospace; font-size: 12px; """) left_layout.addWidget(self.debug_display) self.splitter.addWidget(left_panel) # Right panel with video and EPG right_panel = QWidget() layout = QVBoxLayout(right_panel) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) self.video_container = QWidget() self.video_container.setStyleSheet("background-color: black;") self.video_container.setMinimumHeight(400) video_container_layout = QVBoxLayout(self.video_container) video_container_layout.setContentsMargins(0, 0, 0, 0) self.video_frame = VideoFrame(self.video_container) video_container_layout.addWidget(self.video_frame) layout.addWidget(self.video_container, stretch=3) self.player = VLCPlayer(self.video_frame) self.epg_display = QLabel("EPG info will appear here") self.epg_display.setStyleSheet(""" background-color: #222; color: white; padding: 10px; font-size: 14px; """) self.epg_display.setWordWrap(True) layout.addWidget(self.epg_display, stretch=1) # Timers self.clock_timer = QTimer() self.clock_timer.timeout.connect(self.refresh_epg_time) self.clock_timer.start(1000) self.epg_update_timer = QTimer() self.epg_update_timer.timeout.connect(self.update_current_epg) self.epg_update_timer.start(30000) # Controls self.controls = QWidget() ctl_layout = QHBoxLayout(self.controls) for text, slot in [("⏮️", self.play_previous_channel), ("⏯️", self.toggle_play_pause), ("⏭️", self.play_next_channel), ("🖵", self.toggle_fullscreen), ("🐞", self.show_debug_info)]: btn = QPushButton(text) btn.clicked.connect(slot) ctl_layout.addWidget(btn) self.volume_slider = QSlider(Qt.Horizontal) self.volume_slider.setRange(0, 200) self.volume_slider.setValue(70) self.volume_slider.setToolTip("Volume") self.volume_slider.valueChanged.connect(self.set_volume) ctl_layout.addWidget(self.volume_slider) self.mute_button = QPushButton("🔊") self.mute_button.setCheckable(True) self.mute_button.clicked.connect(self.toggle_mute) ctl_layout.addWidget(self.mute_button) layout.addWidget(self.controls) self.splitter.addWidget(right_panel) self.splitter.setSizes([400, 880]) self.setCentralWidget(self.splitter) self.log_debug("Application initialized") def show_debug_info(self): """Show detailed debug information""" debug_info = [ f"Current Channel: {getattr(self, 'current_channel_name', 'None')}", f"Channel ID: {getattr(self, 'current_channel_id', 'None')}", f"EPG URL: {self.epg_url}", f"Last EPG Data: {self.current_channel_epg}", f"Debug Log: {len(self.debug_log)} entries" ] msg = "\n".join(debug_info) self.log_debug(f"Debug Info:\n{msg}") def set_volume(self, value): self.player.media_player.audio_set_volume(value) if value == 0: self.player.media_player.audio_set_mute(True) self.mute_button.setChecked(True) self.mute_button.setText("🔇") else: self.player.media_player.audio_set_mute(False) self.mute_button.setChecked(False) self.mute_button.setText("🔊") self.log_debug(f"Volume set to {value}") def toggle_mute(self): is_muted = self.player.media_player.audio_get_mute() self.player.media_player.audio_set_mute(not is_muted) self.mute_button.setText("🔇" if not is_muted else "🔊") self.mute_button.setChecked(not is_muted) self.log_debug(f"Mute {'activated' if not is_muted else 'deactivated'}") def update_epg_info(self, name, resolution, epg_data=None): current_time = QTime.currentTime().toString("HH:mm:ss") if epg_data: epg_text = f""" {name} | {resolution} | {current_time} Now Playing: {epg_data.get('title', 'N/A')} Start: {epg_data.get('start', 'N/A')} End: {epg_data.get('stop', 'N/A')} Description: {epg_data.get('desc', 'N/A')} """ self.log_debug(f"Updated EPG for {name}: {epg_data.get('title', 'N/A')}") else: epg_text = f"{name} | {resolution} | {current_time}\nWaiting for EPG data..." self.log_debug(f"Waiting for EPG data for {name}") self.epg_display.setText(epg_text) def refresh_epg_time(self): if not hasattr(self, 'current_channel_name') or not hasattr(self, 'current_channel_resolution'): return text = self.epg_display.text() parts = text.split("\n", 1) if len(parts) == 2: first_line, epg_line = parts if "|" in first_line: name_resolution = first_line.split("|") if len(name_resolution) >= 2: name = name_resolution[0].replace("", "").replace("", "").strip() resolution = name_resolution[1].strip() self.update_epg_info(name, resolution, self.current_channel_epg) def update_epg_data(self): if not self.epg_url: self.log_debug("No EPG URL available, skipping update") return self.log_debug(f"Downloading EPG data from: {self.epg_url}") # Start EPG download in a separate thread self.epg_downloader = EPGDownloader(self.epg_url) self.epg_downloader.epg_downloaded.connect(self.process_epg_data) self.epg_downloader.error_occurred.connect(self.log_debug) self.epg_downloader.start() def process_epg_data(self, epg_bytes): try: # Try to decompress the data (even if server didn't say it's gzipped) try: epg_xml = gzip.decompress(epg_bytes).decode('utf-8') self.log_debug("EPG data decompressed successfully") except: epg_xml = epg_bytes.decode('utf-8') self.log_debug("EPG data was not compressed") # Save the raw XML for debugging debug_epg_path = os.path.join(self.php_dir, "last_epg.xml") with open(debug_epg_path, "w", encoding="utf-8") as f: f.write(epg_xml) self.log_debug(f"Saved EPG data to {debug_epg_path}") # Parse the XML self.epg_data = ET.fromstring(epg_xml) self.epg_last_update = datetime.now() self.log_debug(f"EPG data parsed successfully, last update: {self.epg_last_update}") # Update current channel's EPG if available if hasattr(self, 'current_channel_id'): self.update_current_epg() except Exception as e: self.log_debug(f"Error processing EPG data: {str(e)}") def update_current_epg(self): if not self.epg_data or not hasattr(self, 'current_channel_id'): return try: current_time = datetime.now().strftime("%Y%m%d%H%M%S +0000") self.log_debug(f"Looking for current program (time: {current_time})") found = False for programme in self.epg_data.findall('.//programme'): channel_id = programme.get('channel') if channel_id != self.current_channel_id: continue start = programme.get('start') stop = programme.get('stop') self.log_debug(f"Checking program: {start} - {stop}") if start <= current_time <= stop: title = programme.find('title').text if programme.find('title') is not None else 'N/A' desc = programme.find('desc').text if programme.find('desc') is not None else 'N/A' # Format times for display try: start_time = datetime.strptime(start[:14], "%Y%m%d%H%M%S").strftime("%H:%M") stop_time = datetime.strptime(stop[:14], "%Y%m%d%H%M%S").strftime("%H:%M") except: start_time = start[8:12] # Just show hours and minutes stop_time = stop[8:12] self.current_channel_epg = { 'title': title, 'start': start_time, 'stop': stop_time, 'desc': desc } self.log_debug(f"Current program: {title} ({start_time}-{stop_time})") found = True break if not found: self.log_debug("No current program found in EPG") self.current_channel_epg = None # Update the display if hasattr(self, 'current_channel_name') and hasattr(self, 'current_channel_resolution'): self.update_epg_info(self.current_channel_name, self.current_channel_resolution, self.current_channel_epg) except Exception as e: self.log_debug(f"Error updating current EPG: {str(e)}") def handle_epg_response(self, reply): try: self.log_debug(f"Received EPG response, status: {reply.error()}") if reply.error(): self.log_debug(f"EPG request failed: {reply.errorString()}") return data = reply.readAll().data() if not data: self.log_debug("Empty EPG response received") return self.log_debug(f"EPG data size: {len(data)} bytes") # Debug: Save raw EPG data to file debug_epg_path = os.path.join(self.php_dir, "last_epg.xml") with open(debug_epg_path, "wb") as f: f.write(data) self.log_debug(f"Saved raw EPG data to {debug_epg_path}") # Parse XML EPG data try: root = ET.fromstring(data) current_time = datetime.now().strftime("%Y%m%d%H%M%S") self.log_debug(f"Current time for EPG matching: {current_time}") # Find current program for the current channel current_channel_id = getattr(self, 'current_channel_id', None) if not current_channel_id: self.log_debug("No current channel ID set, skipping EPG processing") return self.log_debug(f"Looking for EPG data for channel ID: {current_channel_id}") found = False for programme in root.findall('.//programme'): channel_id = programme.get('channel') if channel_id != current_channel_id: continue start = programme.get('start') stop = programme.get('stop') self.log_debug(f"Found programme: {channel_id} {start}-{stop}") if start <= current_time <= stop: title = programme.find('title').text if programme.find('title') is not None else 'N/A' desc = programme.find('desc').text if programme.find('desc') is not None else 'N/A' # Format times for display try: start_time = datetime.strptime(start[:12], "%Y%m%d%H%M").strftime("%H:%M") stop_time = datetime.strptime(stop[:12], "%Y%m%d%H%M").strftime("%H:%M") except: start_time = start stop_time = stop self.current_channel_epg = { 'title': title, 'start': start_time, 'stop': stop_time, 'desc': desc } self.log_debug(f"Matched current programme: {title} ({start_time}-{stop_time})") found = True break if not found: self.log_debug("No matching programme found in EPG data") except ET.ParseError as pe: self.log_debug(f"XML parsing error: {pe}") # Debug: Print first 200 chars of the problematic XML self.log_debug(f"Problematic XML start: {data[:200].decode('utf-8', errors='replace')}") except Exception as e: self.log_debug(f"Error parsing EPG: {e}") except Exception as e: self.log_debug(f"Error in EPG response handler: {e}") def update_current_epg(self): if not self.epg_url: self.log_debug("No EPG URL available, skipping update") return if not hasattr(self, 'current_channel_id'): self.log_debug("No current channel ID, skipping EPG update") return epg_request_url = f"{self.epg_url}?channel={self.current_channel_id}" self.log_debug(f"Requesting EPG data from: {epg_request_url}") request = QNetworkRequest(QUrl(epg_request_url)) self.network_manager.get(request) def detect_resolution(self, item, base): try: w = self.player.media_player.video_get_width() h = self.player.media_player.video_get_height() fps = self.player.media_player.get_fps() self.log_debug(f"Detected video: {w}x{h} at {fps:.2f}FPS") if w > 0 and h > 0: label = "SD" if w >= 3840: label = "4K" elif w >= 1920: label = "FHD" elif w >= 1280: label = "HD" info = f"{label}, {fps:.2f}FPS" item.setText(f"{base} ({info})") self.current_channel_name = base self.current_channel_resolution = info self.update_epg_info(base, info) except Exception as e: self.log_debug(f"Error detecting resolution: {e}") def on_channel_clicked(self, item: QListWidgetItem): try: url = item.data(Qt.UserRole) self.log_debug(f"Playing channel: {url}") self.player.play_stream(url) # Get channel ID from item data channel_id = item.data(Qt.UserRole + 1) if channel_id: self.current_channel_id = channel_id self.log_debug(f"Channel ID set to: {channel_id}") self.update_current_epg() else: self.log_debug("No channel ID found for this item") base = item.text().split('(')[0].strip() item.setText(f"{base} (Loading...)") self.current_channel_name = base self.update_epg_info(base, "Loading...") QTimer.singleShot(4000, lambda: self.detect_resolution(item, base)) except Exception as e: self.log_debug(f"Error in channel click handler: {e}") def toggle_play_pause(self): mp = self.player.media_player if mp.is_playing(): mp.pause() self.log_debug("Playback paused") else: mp.play() self.log_debug("Playback resumed") def play_next_channel(self): idx = self.channel_list.currentRow() if idx < self.channel_list.count() - 1: self.channel_list.setCurrentRow(idx+1) self.on_channel_clicked(self.channel_list.currentItem()) self.log_debug("Switched to next channel") def play_previous_channel(self): idx = self.channel_list.currentRow() if idx > 0: self.channel_list.setCurrentRow(idx-1) self.on_channel_clicked(self.channel_list.currentItem()) self.log_debug("Switched to previous channel") def toggle_fullscreen(self): if not self._is_fullscreen: self._normal_geometry = self.geometry() self._normal_flags = self.windowFlags() self.channel_list.hide() self.epg_display.hide() self.controls.hide() self.setWindowFlags(self._normal_flags | Qt.FramelessWindowHint) self.showFullScreen() self.log_debug("Entered fullscreen mode") else: self.setWindowFlags(self._normal_flags) self.showNormal() if self._normal_geometry: self.setGeometry(self._normal_geometry) self.channel_list.show() self.epg_display.show() self.controls.show() self.log_debug("Exited fullscreen mode") self._is_fullscreen = not self._is_fullscreen def keyPressEvent(self, event): if event.key() == Qt.Key_Escape and self._is_fullscreen: self.toggle_fullscreen() else: super().keyPressEvent(event) def download_playlist(self): try: os.makedirs(self.php_dir, exist_ok=True) url = "https://iptv.nywebforum.com/playlist.m3u" self.log_debug(f"Downloading playlist from: {url}") response = requests.get(url, timeout=10) content = response.text.replace( "https://iptv.nywebforum.com", "http://localhost:8888" ) # Extract EPG URL from the first line if present first_line = response.text.split('\n')[0] if first_line.startswith("#EXTM3U") and "url-tvg=" in first_line: self.epg_url = first_line.split('url-tvg="')[1].split('"')[0] self.log_debug(f"Found EPG URL: {self.epg_url}") path = os.path.join(self.php_dir, "playlist.m3u") with open(path, "w", encoding="utf-8") as f: f.write(content) self.log_debug(f"Playlist saved to: {path}") self.load_playlist(path) except Exception as e: self.log_debug(f"Failed to download playlist: {e}") def load_playlist(self, path): try: self.log_debug(f"Loading playlist from: {path}") lines = open(path, encoding="utf-8").read().splitlines() self.channel_list.clear() first = None name = None channel_id = None for i, l in enumerate(lines): if l.startswith("#EXTINF:"): # Extract channel name name = l.split(',')[-1].strip() # Extract channel ID if present (tvg-id attribute) if 'tvg-id="' in l: channel_id = l.split('tvg-id="')[1].split('"')[0] self.log_debug(f"Found channel ID: {channel_id} for {name}") elif l and not l.startswith("#") and name: it = QListWidgetItem(f"{name} (Loading...)") it.setData(Qt.UserRole, l) # Store channel ID with the item if available if channel_id: it.setData(Qt.UserRole + 1, channel_id) self.channel_list.addItem(it) if first is None: first = it name = None channel_id = None if first: self.channel_list.setCurrentItem(first) self.log_debug(f"Found {self.channel_list.count()} channels, selecting first one") QTimer.singleShot(1000, lambda: self.on_channel_clicked(first)) except Exception as e: self.log_debug(f"Failed to load playlist: {e}") def closeEvent(self, event): self.player.stop() self.player.release() self.log_debug("Application closing") super().closeEvent(event) def start_php_server(php_dir, port=8888): try: os.chdir(php_dir) return subprocess.Popen([ "php", "-S", f"localhost:{port}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) except Exception as e: print(f"Failed to start PHP server: {e}") return None if __name__ == "__main__": php_dir = os.path.join(os.path.dirname(__file__), "php_files") os.makedirs(php_dir, exist_ok=True) php_proc = start_php_server(php_dir) app = QApplication(sys.argv) window = IPTVWindow(php_dir) window.show() exit_code = app.exec_() window.player.stop() window.player.release() if php_proc: php_proc.terminate() php_proc.wait(2) sys.exit(exit_code)