import os
import sys
import time
import requests
import subprocess
import gzip
import io
from datetime import datetime
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QVBoxLayout, QListWidget, QWidget, QHBoxLayout,
QListWidgetItem, QLabel, QSplitter, QFrame, QPushButton, QSlider, QSizePolicy
)
from PyQt5.QtCore import Qt, QTimer, QEvent, QTime, QUrl, QThread, pyqtSignal
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest
import vlc
import xml.etree.ElementTree as ET
class VideoFrame(QFrame):
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("background-color: black;")
class VLCPlayer:
def __init__(self, video_frame):
self.instance = vlc.Instance()
self.media_player = self.instance.media_player_new()
if sys.platform == "win32":
self.media_player.set_hwnd(int(video_frame.winId()))
else:
self.media_player.set_xwindow(int(video_frame.winId()))
def play_stream(self, url):
media = self.instance.media_new(url)
media.add_option(":network-caching=1000")
self.media_player.set_media(media)
self.media_player.play()
self.media_player.audio_set_volume(70)
def stop(self):
self.media_player.stop()
def release(self):
self.media_player.release()
self.instance.release()
class EPGDownloader(QThread):
epg_downloaded = pyqtSignal(bytes)
error_occurred = pyqtSignal(str)
def __init__(self, epg_url):
super().__init__()
self.epg_url = epg_url
def run(self):
try:
response = requests.get(self.epg_url, timeout=10)
response.raise_for_status()
# Check if content is gzipped
if response.headers.get('Content-Encoding') == 'gzip':
self.epg_downloaded.emit(response.content)
else:
# Try to decompress anyway (some servers don't set proper headers)
try:
decompressed = gzip.decompress(response.content)
self.epg_downloaded.emit(decompressed)
except:
self.epg_downloaded.emit(response.content)
except Exception as e:
self.error_occurred.emit(f"EPG download failed: {str(e)}")
class IPTVWindow(QMainWindow):
def __init__(self, php_dir):
super().__init__()
self.setWindowTitle("IPTV Player with EPG [DEBUG MODE]")
self.setGeometry(100, 100, 1280, 720)
self.php_dir = php_dir
self._is_fullscreen = False
self._normal_geometry = None
self._normal_flags = None
self.current_channel_epg = None
self.epg_url = None
self.debug_log = []
# Network manager for EPG requests
self.network_manager = QNetworkAccessManager()
self.network_manager.finished.connect(self.handle_epg_response)
self.init_ui()
self.installEventFilter(self)
self.download_playlist()
def log_debug(self, message):
"""Add debug message to log and print to console"""
timestamp = datetime.now().strftime("%H:%M:%S")
full_message = f"[{timestamp}] {message}"
self.debug_log.append(full_message)
print(full_message)
# Update debug info in UI if available
if hasattr(self, 'debug_display'):
self.debug_display.addItem(full_message) # Fixed: using addItem instead of append
self.debug_display.scrollToBottom()
def init_ui(self):
self.splitter = QSplitter(Qt.Horizontal)
# Left panel with channels and debug info
left_panel = QWidget()
left_layout = QVBoxLayout(left_panel)
self.channel_list = QListWidget()
self.channel_list.itemClicked.connect(self.on_channel_clicked)
left_layout.addWidget(self.channel_list)
# Debug console
self.debug_display = QListWidget()
self.debug_display.setStyleSheet("""
background-color: #111;
color: #0f0;
font-family: Consolas, monospace;
font-size: 12px;
""")
left_layout.addWidget(self.debug_display)
self.splitter.addWidget(left_panel)
# Right panel with video and EPG
right_panel = QWidget()
layout = QVBoxLayout(right_panel)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
self.video_container = QWidget()
self.video_container.setStyleSheet("background-color: black;")
self.video_container.setMinimumHeight(400)
video_container_layout = QVBoxLayout(self.video_container)
video_container_layout.setContentsMargins(0, 0, 0, 0)
self.video_frame = VideoFrame(self.video_container)
video_container_layout.addWidget(self.video_frame)
layout.addWidget(self.video_container, stretch=3)
self.player = VLCPlayer(self.video_frame)
self.epg_display = QLabel("EPG info will appear here")
self.epg_display.setStyleSheet("""
background-color: #222;
color: white;
padding: 10px;
font-size: 14px;
""")
self.epg_display.setWordWrap(True)
layout.addWidget(self.epg_display, stretch=1)
# Timers
self.clock_timer = QTimer()
self.clock_timer.timeout.connect(self.refresh_epg_time)
self.clock_timer.start(1000)
self.epg_update_timer = QTimer()
self.epg_update_timer.timeout.connect(self.update_current_epg)
self.epg_update_timer.start(30000)
# Controls
self.controls = QWidget()
ctl_layout = QHBoxLayout(self.controls)
for text, slot in [("⏮️", self.play_previous_channel),
("⏯️", self.toggle_play_pause),
("⏭️", self.play_next_channel),
("🖵", self.toggle_fullscreen),
("🐞", self.show_debug_info)]:
btn = QPushButton(text)
btn.clicked.connect(slot)
ctl_layout.addWidget(btn)
self.volume_slider = QSlider(Qt.Horizontal)
self.volume_slider.setRange(0, 200)
self.volume_slider.setValue(70)
self.volume_slider.setToolTip("Volume")
self.volume_slider.valueChanged.connect(self.set_volume)
ctl_layout.addWidget(self.volume_slider)
self.mute_button = QPushButton("🔊")
self.mute_button.setCheckable(True)
self.mute_button.clicked.connect(self.toggle_mute)
ctl_layout.addWidget(self.mute_button)
layout.addWidget(self.controls)
self.splitter.addWidget(right_panel)
self.splitter.setSizes([400, 880])
self.setCentralWidget(self.splitter)
self.log_debug("Application initialized")
def show_debug_info(self):
"""Show detailed debug information"""
debug_info = [
f"Current Channel: {getattr(self, 'current_channel_name', 'None')}",
f"Channel ID: {getattr(self, 'current_channel_id', 'None')}",
f"EPG URL: {self.epg_url}",
f"Last EPG Data: {self.current_channel_epg}",
f"Debug Log: {len(self.debug_log)} entries"
]
msg = "\n".join(debug_info)
self.log_debug(f"Debug Info:\n{msg}")
def set_volume(self, value):
self.player.media_player.audio_set_volume(value)
if value == 0:
self.player.media_player.audio_set_mute(True)
self.mute_button.setChecked(True)
self.mute_button.setText("🔇")
else:
self.player.media_player.audio_set_mute(False)
self.mute_button.setChecked(False)
self.mute_button.setText("🔊")
self.log_debug(f"Volume set to {value}")
def toggle_mute(self):
is_muted = self.player.media_player.audio_get_mute()
self.player.media_player.audio_set_mute(not is_muted)
self.mute_button.setText("🔇" if not is_muted else "🔊")
self.mute_button.setChecked(not is_muted)
self.log_debug(f"Mute {'activated' if not is_muted else 'deactivated'}")
def update_epg_info(self, name, resolution, epg_data=None):
current_time = QTime.currentTime().toString("HH:mm:ss")
if epg_data:
epg_text = f"""
{name} | {resolution} | {current_time}
Now Playing: {epg_data.get('title', 'N/A')}
Start: {epg_data.get('start', 'N/A')}
End: {epg_data.get('stop', 'N/A')}
Description: {epg_data.get('desc', 'N/A')}
"""
self.log_debug(f"Updated EPG for {name}: {epg_data.get('title', 'N/A')}")
else:
epg_text = f"{name} | {resolution} | {current_time}\nWaiting for EPG data..."
self.log_debug(f"Waiting for EPG data for {name}")
self.epg_display.setText(epg_text)
def refresh_epg_time(self):
if not hasattr(self, 'current_channel_name') or not hasattr(self, 'current_channel_resolution'):
return
text = self.epg_display.text()
parts = text.split("\n", 1)
if len(parts) == 2:
first_line, epg_line = parts
if "|" in first_line:
name_resolution = first_line.split("|")
if len(name_resolution) >= 2:
name = name_resolution[0].replace("", "").replace("", "").strip()
resolution = name_resolution[1].strip()
self.update_epg_info(name, resolution, self.current_channel_epg)
def update_epg_data(self):
if not self.epg_url:
self.log_debug("No EPG URL available, skipping update")
return
self.log_debug(f"Downloading EPG data from: {self.epg_url}")
# Start EPG download in a separate thread
self.epg_downloader = EPGDownloader(self.epg_url)
self.epg_downloader.epg_downloaded.connect(self.process_epg_data)
self.epg_downloader.error_occurred.connect(self.log_debug)
self.epg_downloader.start()
def process_epg_data(self, epg_bytes):
try:
# Try to decompress the data (even if server didn't say it's gzipped)
try:
epg_xml = gzip.decompress(epg_bytes).decode('utf-8')
self.log_debug("EPG data decompressed successfully")
except:
epg_xml = epg_bytes.decode('utf-8')
self.log_debug("EPG data was not compressed")
# Save the raw XML for debugging
debug_epg_path = os.path.join(self.php_dir, "last_epg.xml")
with open(debug_epg_path, "w", encoding="utf-8") as f:
f.write(epg_xml)
self.log_debug(f"Saved EPG data to {debug_epg_path}")
# Parse the XML
self.epg_data = ET.fromstring(epg_xml)
self.epg_last_update = datetime.now()
self.log_debug(f"EPG data parsed successfully, last update: {self.epg_last_update}")
# Update current channel's EPG if available
if hasattr(self, 'current_channel_id'):
self.update_current_epg()
except Exception as e:
self.log_debug(f"Error processing EPG data: {str(e)}")
def update_current_epg(self):
if not self.epg_data or not hasattr(self, 'current_channel_id'):
return
try:
current_time = datetime.now().strftime("%Y%m%d%H%M%S +0000")
self.log_debug(f"Looking for current program (time: {current_time})")
found = False
for programme in self.epg_data.findall('.//programme'):
channel_id = programme.get('channel')
if channel_id != self.current_channel_id:
continue
start = programme.get('start')
stop = programme.get('stop')
self.log_debug(f"Checking program: {start} - {stop}")
if start <= current_time <= stop:
title = programme.find('title').text if programme.find('title') is not None else 'N/A'
desc = programme.find('desc').text if programme.find('desc') is not None else 'N/A'
# Format times for display
try:
start_time = datetime.strptime(start[:14], "%Y%m%d%H%M%S").strftime("%H:%M")
stop_time = datetime.strptime(stop[:14], "%Y%m%d%H%M%S").strftime("%H:%M")
except:
start_time = start[8:12] # Just show hours and minutes
stop_time = stop[8:12]
self.current_channel_epg = {
'title': title,
'start': start_time,
'stop': stop_time,
'desc': desc
}
self.log_debug(f"Current program: {title} ({start_time}-{stop_time})")
found = True
break
if not found:
self.log_debug("No current program found in EPG")
self.current_channel_epg = None
# Update the display
if hasattr(self, 'current_channel_name') and hasattr(self, 'current_channel_resolution'):
self.update_epg_info(self.current_channel_name,
self.current_channel_resolution,
self.current_channel_epg)
except Exception as e:
self.log_debug(f"Error updating current EPG: {str(e)}")
def handle_epg_response(self, reply):
try:
self.log_debug(f"Received EPG response, status: {reply.error()}")
if reply.error():
self.log_debug(f"EPG request failed: {reply.errorString()}")
return
data = reply.readAll().data()
if not data:
self.log_debug("Empty EPG response received")
return
self.log_debug(f"EPG data size: {len(data)} bytes")
# Debug: Save raw EPG data to file
debug_epg_path = os.path.join(self.php_dir, "last_epg.xml")
with open(debug_epg_path, "wb") as f:
f.write(data)
self.log_debug(f"Saved raw EPG data to {debug_epg_path}")
# Parse XML EPG data
try:
root = ET.fromstring(data)
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
self.log_debug(f"Current time for EPG matching: {current_time}")
# Find current program for the current channel
current_channel_id = getattr(self, 'current_channel_id', None)
if not current_channel_id:
self.log_debug("No current channel ID set, skipping EPG processing")
return
self.log_debug(f"Looking for EPG data for channel ID: {current_channel_id}")
found = False
for programme in root.findall('.//programme'):
channel_id = programme.get('channel')
if channel_id != current_channel_id:
continue
start = programme.get('start')
stop = programme.get('stop')
self.log_debug(f"Found programme: {channel_id} {start}-{stop}")
if start <= current_time <= stop:
title = programme.find('title').text if programme.find('title') is not None else 'N/A'
desc = programme.find('desc').text if programme.find('desc') is not None else 'N/A'
# Format times for display
try:
start_time = datetime.strptime(start[:12], "%Y%m%d%H%M").strftime("%H:%M")
stop_time = datetime.strptime(stop[:12], "%Y%m%d%H%M").strftime("%H:%M")
except:
start_time = start
stop_time = stop
self.current_channel_epg = {
'title': title,
'start': start_time,
'stop': stop_time,
'desc': desc
}
self.log_debug(f"Matched current programme: {title} ({start_time}-{stop_time})")
found = True
break
if not found:
self.log_debug("No matching programme found in EPG data")
except ET.ParseError as pe:
self.log_debug(f"XML parsing error: {pe}")
# Debug: Print first 200 chars of the problematic XML
self.log_debug(f"Problematic XML start: {data[:200].decode('utf-8', errors='replace')}")
except Exception as e:
self.log_debug(f"Error parsing EPG: {e}")
except Exception as e:
self.log_debug(f"Error in EPG response handler: {e}")
def update_current_epg(self):
if not self.epg_url:
self.log_debug("No EPG URL available, skipping update")
return
if not hasattr(self, 'current_channel_id'):
self.log_debug("No current channel ID, skipping EPG update")
return
epg_request_url = f"{self.epg_url}?channel={self.current_channel_id}"
self.log_debug(f"Requesting EPG data from: {epg_request_url}")
request = QNetworkRequest(QUrl(epg_request_url))
self.network_manager.get(request)
def detect_resolution(self, item, base):
try:
w = self.player.media_player.video_get_width()
h = self.player.media_player.video_get_height()
fps = self.player.media_player.get_fps()
self.log_debug(f"Detected video: {w}x{h} at {fps:.2f}FPS")
if w > 0 and h > 0:
label = "SD"
if w >= 3840:
label = "4K"
elif w >= 1920:
label = "FHD"
elif w >= 1280:
label = "HD"
info = f"{label}, {fps:.2f}FPS"
item.setText(f"{base} ({info})")
self.current_channel_name = base
self.current_channel_resolution = info
self.update_epg_info(base, info)
except Exception as e:
self.log_debug(f"Error detecting resolution: {e}")
def on_channel_clicked(self, item: QListWidgetItem):
try:
url = item.data(Qt.UserRole)
self.log_debug(f"Playing channel: {url}")
self.player.play_stream(url)
# Get channel ID from item data
channel_id = item.data(Qt.UserRole + 1)
if channel_id:
self.current_channel_id = channel_id
self.log_debug(f"Channel ID set to: {channel_id}")
self.update_current_epg()
else:
self.log_debug("No channel ID found for this item")
base = item.text().split('(')[0].strip()
item.setText(f"{base} (Loading...)")
self.current_channel_name = base
self.update_epg_info(base, "Loading...")
QTimer.singleShot(4000, lambda: self.detect_resolution(item, base))
except Exception as e:
self.log_debug(f"Error in channel click handler: {e}")
def toggle_play_pause(self):
mp = self.player.media_player
if mp.is_playing():
mp.pause()
self.log_debug("Playback paused")
else:
mp.play()
self.log_debug("Playback resumed")
def play_next_channel(self):
idx = self.channel_list.currentRow()
if idx < self.channel_list.count() - 1:
self.channel_list.setCurrentRow(idx+1)
self.on_channel_clicked(self.channel_list.currentItem())
self.log_debug("Switched to next channel")
def play_previous_channel(self):
idx = self.channel_list.currentRow()
if idx > 0:
self.channel_list.setCurrentRow(idx-1)
self.on_channel_clicked(self.channel_list.currentItem())
self.log_debug("Switched to previous channel")
def toggle_fullscreen(self):
if not self._is_fullscreen:
self._normal_geometry = self.geometry()
self._normal_flags = self.windowFlags()
self.channel_list.hide()
self.epg_display.hide()
self.controls.hide()
self.setWindowFlags(self._normal_flags | Qt.FramelessWindowHint)
self.showFullScreen()
self.log_debug("Entered fullscreen mode")
else:
self.setWindowFlags(self._normal_flags)
self.showNormal()
if self._normal_geometry:
self.setGeometry(self._normal_geometry)
self.channel_list.show()
self.epg_display.show()
self.controls.show()
self.log_debug("Exited fullscreen mode")
self._is_fullscreen = not self._is_fullscreen
def keyPressEvent(self, event):
if event.key() == Qt.Key_Escape and self._is_fullscreen:
self.toggle_fullscreen()
else:
super().keyPressEvent(event)
def download_playlist(self):
try:
os.makedirs(self.php_dir, exist_ok=True)
url = "https://iptv.nywebforum.com/playlist.m3u"
self.log_debug(f"Downloading playlist from: {url}")
response = requests.get(url, timeout=10)
content = response.text.replace(
"https://iptv.nywebforum.com", "http://localhost:8888"
)
# Extract EPG URL from the first line if present
first_line = response.text.split('\n')[0]
if first_line.startswith("#EXTM3U") and "url-tvg=" in first_line:
self.epg_url = first_line.split('url-tvg="')[1].split('"')[0]
self.log_debug(f"Found EPG URL: {self.epg_url}")
path = os.path.join(self.php_dir, "playlist.m3u")
with open(path, "w", encoding="utf-8") as f:
f.write(content)
self.log_debug(f"Playlist saved to: {path}")
self.load_playlist(path)
except Exception as e:
self.log_debug(f"Failed to download playlist: {e}")
def load_playlist(self, path):
try:
self.log_debug(f"Loading playlist from: {path}")
lines = open(path, encoding="utf-8").read().splitlines()
self.channel_list.clear()
first = None
name = None
channel_id = None
for i, l in enumerate(lines):
if l.startswith("#EXTINF:"):
# Extract channel name
name = l.split(',')[-1].strip()
# Extract channel ID if present (tvg-id attribute)
if 'tvg-id="' in l:
channel_id = l.split('tvg-id="')[1].split('"')[0]
self.log_debug(f"Found channel ID: {channel_id} for {name}")
elif l and not l.startswith("#") and name:
it = QListWidgetItem(f"{name} (Loading...)")
it.setData(Qt.UserRole, l)
# Store channel ID with the item if available
if channel_id:
it.setData(Qt.UserRole + 1, channel_id)
self.channel_list.addItem(it)
if first is None:
first = it
name = None
channel_id = None
if first:
self.channel_list.setCurrentItem(first)
self.log_debug(f"Found {self.channel_list.count()} channels, selecting first one")
QTimer.singleShot(1000, lambda: self.on_channel_clicked(first))
except Exception as e:
self.log_debug(f"Failed to load playlist: {e}")
def closeEvent(self, event):
self.player.stop()
self.player.release()
self.log_debug("Application closing")
super().closeEvent(event)
def start_php_server(php_dir, port=8888):
try:
os.chdir(php_dir)
return subprocess.Popen([
"php", "-S", f"localhost:{port}"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
)
except Exception as e:
print(f"Failed to start PHP server: {e}")
return None
if __name__ == "__main__":
php_dir = os.path.join(os.path.dirname(__file__), "php_files")
os.makedirs(php_dir, exist_ok=True)
php_proc = start_php_server(php_dir)
app = QApplication(sys.argv)
window = IPTVWindow(php_dir)
window.show()
exit_code = app.exec_()
window.player.stop()
window.player.release()
if php_proc:
php_proc.terminate()
php_proc.wait(2)
sys.exit(exit_code)