import os import sys import time import requests import subprocess import gzip import io from datetime import datetime from PyQt5.QtWidgets import ( QApplication, QMainWindow, QVBoxLayout, QListWidget, QWidget, QHBoxLayout, QListWidgetItem, QLabel, QSplitter, QFrame, QPushButton, QSlider, QSizePolicy ) from PyQt5.QtCore import Qt, QTimer, QEvent, QTime, QUrl, QThread, pyqtSignal import vlc import xml.etree.ElementTree as ET class VideoFrame(QFrame): def __init__(self, parent=None): super().__init__(parent) self.setStyleSheet("background-color: black;") class VLCPlayer: def __init__(self, video_frame): self.instance = vlc.Instance() self.media_player = self.instance.media_player_new() if sys.platform == "win32": self.media_player.set_hwnd(int(video_frame.winId())) else: self.media_player.set_xwindow(int(video_frame.winId())) def play_stream(self, url): media = self.instance.media_new(url) media.add_option(":network-caching=1000") self.media_player.set_media(media) self.media_player.play() self.media_player.audio_set_volume(70) def stop(self): self.media_player.stop() def release(self): self.media_player.release() self.instance.release() class EPGDownloader(QThread): epg_downloaded = pyqtSignal(bytes) error_occurred = pyqtSignal(str) def __init__(self, epg_url): super().__init__() self.epg_url = epg_url def run(self): try: response = requests.get(self.epg_url, timeout=10) response.raise_for_status() self.epg_downloaded.emit(response.content) except Exception as e: self.error_occurred.emit(f"EPG download failed: {str(e)}") class IPTVWindow(QMainWindow): def __init__(self, php_dir): super().__init__() self.setWindowTitle("IPTV Player with EPG") self.setGeometry(100, 100, 1280, 720) self.php_dir = php_dir self._is_fullscreen = False self._normal_geometry = None self._normal_flags = None self.current_channel_epg = None self.epg_url = None self.epg_data = None self.epg_last_update = None self.debug_log = [] self.init_ui() self.installEventFilter(self) self.download_playlist() def log_debug(self, message): """Add debug message to log and print to console""" timestamp = datetime.now().strftime("%H:%M:%S") full_message = f"[{timestamp}] {message}" self.debug_log.append(full_message) print(full_message) if hasattr(self, 'debug_display'): self.debug_display.addItem(full_message) self.debug_display.scrollToBottom() def init_ui(self): self.splitter = QSplitter(Qt.Horizontal) # Left panel with channels and debug info left_panel = QWidget() left_layout = QVBoxLayout(left_panel) self.channel_list = QListWidget() self.channel_list.itemClicked.connect(self.on_channel_clicked) left_layout.addWidget(self.channel_list) # Debug console self.debug_display = QListWidget() self.debug_display.setStyleSheet(""" background-color: #111; color: #0f0; font-family: Consolas, monospace; font-size: 12px; """) left_layout.addWidget(self.debug_display) self.splitter.addWidget(left_panel) # Right panel with video and EPG right_panel = QWidget() layout = QVBoxLayout(right_panel) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) self.video_container = QWidget() self.video_container.setStyleSheet("background-color: black;") self.video_container.setMinimumHeight(400) video_container_layout = QVBoxLayout(self.video_container) video_container_layout.setContentsMargins(0, 0, 0, 0) self.video_frame = VideoFrame(self.video_container) video_container_layout.addWidget(self.video_frame) layout.addWidget(self.video_container, stretch=3) self.player = VLCPlayer(self.video_frame) self.epg_display = QLabel("EPG info will appear here") self.epg_display.setStyleSheet(""" background-color: #222; color: white; padding: 10px; font-size: 14px; """) self.epg_display.setWordWrap(True) layout.addWidget(self.epg_display, stretch=1) # Timers self.clock_timer = QTimer() self.clock_timer.timeout.connect(self.refresh_epg_time) self.clock_timer.start(1000) self.epg_update_timer = QTimer() self.epg_update_timer.timeout.connect(self.update_epg_data) self.epg_update_timer.start(3600000) # Update EPG every hour # Controls self.controls = QWidget() ctl_layout = QHBoxLayout(self.controls) for text, slot in [("⏮️", self.play_previous_channel), ("⏯️", self.toggle_play_pause), ("⏭️", self.play_next_channel), ("🖵", self.toggle_fullscreen), ("🔄", self.update_epg_data)]: btn = QPushButton(text) btn.clicked.connect(slot) ctl_layout.addWidget(btn) self.volume_slider = QSlider(Qt.Horizontal) self.volume_slider.setRange(0, 200) self.volume_slider.setValue(70) self.volume_slider.setToolTip("Volume") self.volume_slider.valueChanged.connect(self.set_volume) ctl_layout.addWidget(self.volume_slider) self.mute_button = QPushButton("🔊") self.mute_button.setCheckable(True) self.mute_button.clicked.connect(self.toggle_mute) ctl_layout.addWidget(self.mute_button) layout.addWidget(self.controls) self.splitter.addWidget(right_panel) self.splitter.setSizes([400, 880]) self.setCentralWidget(self.splitter) self.log_debug("Application initialized") def set_volume(self, value): self.player.media_player.audio_set_volume(value) if value == 0: self.player.media_player.audio_set_mute(True) self.mute_button.setChecked(True) self.mute_button.setText("🔇") else: self.player.media_player.audio_set_mute(False) self.mute_button.setChecked(False) self.mute_button.setText("🔊") def toggle_mute(self): is_muted = self.player.media_player.audio_get_mute() self.player.media_player.audio_set_mute(not is_muted) self.mute_button.setText("🔇" if not is_muted else "🔊") self.mute_button.setChecked(not is_muted) def update_epg_info(self, name, resolution, epg_data=None): current_time = QTime.currentTime().toString("HH:mm:ss") if epg_data: epg_text = f""" {name} | {resolution} | {current_time} Now Playing: {epg_data.get('title', 'N/A')} Start: {epg_data.get('start', 'N/A')} End: {epg_data.get('stop', 'N/A')} Description: {epg_data.get('desc', 'N/A')} """ else: epg_text = f"{name} | {resolution} | {current_time}\nWaiting for EPG data..." self.epg_display.setText(epg_text) def refresh_epg_time(self): if not hasattr(self, 'current_channel_name') or not hasattr(self, 'current_channel_resolution'): return text = self.epg_display.text() parts = text.split("\n", 1) if len(parts) == 2: first_line, epg_line = parts if "|" in first_line: name_resolution = first_line.split("|") if len(name_resolution) >= 2: name = name_resolution[0].replace("", "").replace("", "").strip() resolution = name_resolution[1].strip() self.update_epg_info(name, resolution, self.current_channel_epg) def update_epg_data(self): if not self.epg_url: self.log_debug("No EPG URL available") return self.log_debug(f"Downloading EPG data from: {self.epg_url}") self.epg_downloader = EPGDownloader(self.epg_url) self.epg_downloader.epg_downloaded.connect(self.process_epg_data) self.epg_downloader.error_occurred.connect(self.log_debug) self.epg_downloader.start() def process_epg_data(self, epg_bytes): try: # Try to decompress the data try: epg_xml = gzip.decompress(epg_bytes).decode('utf-8') self.log_debug("EPG data decompressed successfully") except: epg_xml = epg_bytes.decode('utf-8') self.log_debug("EPG data was not compressed") # Save the raw XML for debugging debug_epg_path = os.path.join(self.php_dir, "last_epg.xml") with open(debug_epg_path, "w", encoding="utf-8") as f: f.write(epg_xml) self.log_debug(f"Saved EPG data to {debug_epg_path}") # Parse the XML self.epg_data = ET.fromstring(epg_xml) self.epg_last_update = datetime.now() self.log_debug(f"EPG data parsed successfully, last update: {self.epg_last_update}") # Update current channel's EPG if available if hasattr(self, 'current_channel_id'): self.update_current_epg() except Exception as e: self.log_debug(f"Error processing EPG data: {str(e)}") def update_current_epg(self): if not self.epg_data or not hasattr(self, 'current_channel_id'): return try: current_time = datetime.now().strftime("%Y%m%d%H%M%S +0000") self.log_debug(f"Looking for current program (time: {current_time})") found = False for programme in self.epg_data.findall('.//programme'): channel_id = programme.get('channel') if channel_id != self.current_channel_id: continue start = programme.get('start') stop = programme.get('stop') if start <= current_time <= stop: title = programme.find('title').text if programme.find('title') is not None else 'N/A' desc = programme.find('desc').text if programme.find('desc') is not None else 'N/A' # Format times for display try: start_time = datetime.strptime(start[:14], "%Y%m%d%H%M%S").strftime("%H:%M") stop_time = datetime.strptime(stop[:14], "%Y%m%d%H%M%S").strftime("%H:%M") except: start_time = start[8:12] # Just show hours and minutes stop_time = stop[8:12] self.current_channel_epg = { 'title': title, 'start': start_time, 'stop': stop_time, 'desc': desc } found = True break if not found: self.current_channel_epg = None # Update the display if hasattr(self, 'current_channel_name') and hasattr(self, 'current_channel_resolution'): self.update_epg_info(self.current_channel_name, self.current_channel_resolution, self.current_channel_epg) except Exception as e: self.log_debug(f"Error updating current EPG: {str(e)}") def detect_resolution(self, item, base): w = self.player.media_player.video_get_width() h = self.player.media_player.video_get_height() fps = self.player.media_player.get_fps() if w > 0 and h > 0: label = "SD" if w >= 3840: label = "4K" elif w >= 1920: label = "FHD" elif w >= 1280: label = "HD" info = f"{label}, {fps:.2f}FPS" item.setText(f"{base} ({info})") self.current_channel_name = base self.current_channel_resolution = info self.update_epg_info(base, info) def on_channel_clicked(self, item: QListWidgetItem): url = item.data(Qt.UserRole) self.player.play_stream(url) # Get channel ID from item data channel_id = item.data(Qt.UserRole + 1) if channel_id: self.current_channel_id = channel_id self.update_current_epg() base = item.text().split('(')[0].strip() item.setText(f"{base} (Loading...)") self.current_channel_name = base self.update_epg_info(base, "Loading...") QTimer.singleShot(4000, lambda: self.detect_resolution(item, base)) def toggle_play_pause(self): mp = self.player.media_player if mp.is_playing(): mp.pause() else: mp.play() def play_next_channel(self): idx = self.channel_list.currentRow() if idx < self.channel_list.count() - 1: self.channel_list.setCurrentRow(idx+1) self.on_channel_clicked(self.channel_list.currentItem()) def play_previous_channel(self): idx = self.channel_list.currentRow() if idx > 0: self.channel_list.setCurrentRow(idx-1) self.on_channel_clicked(self.channel_list.currentItem()) def toggle_fullscreen(self): if not self._is_fullscreen: self._normal_geometry = self.geometry() self._normal_flags = self.windowFlags() self.channel_list.hide() self.epg_display.hide() self.controls.hide() self.setWindowFlags(self._normal_flags | Qt.FramelessWindowHint) self.showFullScreen() else: self.setWindowFlags(self._normal_flags) self.showNormal() if self._normal_geometry: self.setGeometry(self._normal_geometry) self.channel_list.show() self.epg_display.show() self.controls.show() self._is_fullscreen = not self._is_fullscreen def keyPressEvent(self, event): if event.key() == Qt.Key_Escape and self._is_fullscreen: self.toggle_fullscreen() else: super().keyPressEvent(event) def download_playlist(self): try: os.makedirs(self.php_dir, exist_ok=True) url = "https://iptv.nywebforum.com/playlist.m3u" self.log_debug(f"Downloading playlist from: {url}") response = requests.get(url, timeout=10) content = response.text.replace( "https://iptv.nywebforum.com", "http://localhost:8888" ) # Extract EPG URL from the first line if present first_line = response.text.split('\n')[0] if first_line.startswith("#EXTM3U") and "url-tvg=" in first_line: self.epg_url = first_line.split('url-tvg="')[1].split('"')[0] self.log_debug(f"Found EPG URL: {self.epg_url}") self.update_epg_data() # Download EPG immediately path = os.path.join(self.php_dir, "playlist.m3u") with open(path, "w", encoding="utf-8") as f: f.write(content) self.log_debug(f"Playlist saved to: {path}") self.load_playlist(path) except Exception as e: self.log_debug(f"Failed to download playlist: {e}") def load_playlist(self, path): try: self.log_debug(f"Loading playlist from: {path}") lines = open(path, encoding="utf-8").read().splitlines() self.channel_list.clear() first = None name = None channel_id = None for i, l in enumerate(lines): if l.startswith("#EXTINF:"): # Extract channel name name = l.split(',')[-1].strip() # Extract channel ID if present (tvg-id attribute) if 'tvg-id="' in l: channel_id = l.split('tvg-id="')[1].split('"')[0] self.log_debug(f"Found channel ID: {channel_id} for {name}") elif l and not l.startswith("#") and name: it = QListWidgetItem(f"{name} (Loading...)") it.setData(Qt.UserRole, l) # Store channel ID with the item if available if channel_id: it.setData(Qt.UserRole + 1, channel_id) self.channel_list.addItem(it) if first is None: first = it name = None channel_id = None if first: self.channel_list.setCurrentItem(first) self.log_debug(f"Found {self.channel_list.count()} channels, selecting first one") QTimer.singleShot(1000, lambda: self.on_channel_clicked(first)) except Exception as e: self.log_debug(f"Failed to load playlist: {e}") def closeEvent(self, event): self.log_debug("Application closing - cleaning up resources") # Stop timers self.clock_timer.stop() self.epg_update_timer.stop() # Stop player self.player.stop() self.player.release() # Stop any ongoing EPG download if hasattr(self, 'epg_downloader') and self.epg_downloader.isRunning(): self.epg_downloader.quit() self.epg_downloader.wait() super().closeEvent(event) def start_php_server(php_dir, port=8888): try: os.chdir(php_dir) return subprocess.Popen([ "php", "-S", f"localhost:{port}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ) except Exception as e: print(f"Failed to start PHP server: {e}") return None if __name__ == "__main__": php_dir = os.path.join(os.path.dirname(__file__), "php_files") os.makedirs(php_dir, exist_ok=True) php_proc = start_php_server(php_dir) app = QApplication(sys.argv) window = IPTVWindow(php_dir) window.show() exit_code = app.exec_() # Ensure clean exit if php_proc: php_proc.terminate() php_proc.wait(2) sys.exit(exit_code)