import os
import sys
import json
import time
import uuid
import requests
import subprocess
import gzip
import threading
import hashlib
import socket
import platform
from urllib.parse import unquote
from datetime import datetime, timedelta,timezone
from PyQt5.QtWidgets import (
QMainWindow, QVBoxLayout, QListWidget, QWidget, QHBoxLayout, QDialog, QDialogButtonBox,
QListWidgetItem, QLabel, QSplitter, QFrame, QPushButton, QSlider, QSizePolicy,
QGridLayout,QFileDialog,QGraphicsView, QGraphicsScene, QGraphicsRectItem, QGraphicsSimpleTextItem,
QApplication, QAction, QGraphicsPixmapItem,QGraphicsLineItem,QGraphicsTextItem,QGraphicsItem,QLineEdit,QInputDialog, QMessageBox
)
from PyQt5.QtCore import QUrl,QObject, Qt, QTimer, QThread, pyqtSignal,QEvent
from PyQt5.QtGui import QPixmap, QFont, QColor,QBrush, QColor, QPen,QGuiApplication
from PyQt5.QtMultimedia import QMediaContent
import pytz
import vlc
import xml.etree.ElementTree as ET
RED_LINE_SHIFT_SECONDS = 700 # 10-minute forward shift
php_proc = None # Keep reference to PHP server to prevent it from being garbage collected
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".dsiptv", "logos")
CRED_FILE = os.path.join(CACHE_DIR, "creds.json")
AUTH_URL = "https://iptv.nywebforum.com/admin/auth.php"
os.makedirs(CACHE_DIR, exist_ok=True)
class AuthManager:
@staticmethod
def get_client_id():
try:
return socket.gethostname() + "-" + str(uuid.getnode())
except:
return str(uuid.uuid4())
@staticmethod
def save_credentials(username, password):
os.makedirs(CACHE_DIR, exist_ok=True)
creds = {
"username": username,
"password": password,
"client_id": AuthManager.get_client_id()
}
with open(CRED_FILE, "w") as f:
json.dump(creds, f)
@staticmethod
def load_credentials():
if os.path.exists(CRED_FILE):
with open(CRED_FILE) as f:
return json.load(f)
return None
@staticmethod
def clear_credentials():
if os.path.exists(CRED_FILE):
os.remove(CRED_FILE)
@staticmethod
def authenticate(username, password):
try:
client_id = AuthManager.get_client_id()
print(f"[DEBUG] Sending auth with HTTP Basic: {username=} {client_id=}")
# Basic Auth credentials (for .htaccess)
auth_user = "admin"
auth_pass = "36803565"
response = requests.post(AUTH_URL, data={
"username": username,
"password": password,
"client_id": client_id
}, auth=(auth_user, auth_pass), timeout=10)
print(f"[DEBUG] Response status: {response.status_code}")
print(f"[DEBUG] Response body: {response.text}")
if response.status_code != 200:
return {"status": "error", "message": f"HTTP {response.status_code}: {response.text}"}
return response.json()
except Exception as e:
return {"status": "error", "message": str(e)}
class LoginDialog(QDialog):
def __init__(self, parent=None, preset_username="", preset_password=""):
super().__init__(parent)
self.setWindowTitle("DS-IPTV Login")
self.setFixedWidth(300)
layout = QVBoxLayout(self)
layout.addWidget(QLabel("Enter your username and password:"))
self.username = QLineEdit(self)
self.username.setPlaceholderText("Username")
self.username.setText(preset_username)
layout.addWidget(self.username)
self.password = QLineEdit(self)
self.password.setPlaceholderText("Password")
self.password.setEchoMode(QLineEdit.Password)
self.password.setText(preset_password)
layout.addWidget(self.password)
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self.handle_login)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def handle_login(self):
user = self.username.text().strip()
pwd = self.password.text().strip()
if not user or not pwd:
QMessageBox.warning(self, "Input Required", "Please enter both username and password.")
return
result = AuthManager.authenticate(user, pwd)
if result.get("status") == "ok":
AuthManager.save_credentials(user, pwd)
self.user_playlists = result.get("playlists", [])
self.accept()
else:
QMessageBox.critical(self, "Login Failed", result.get("message", "Unknown error"))
class EPGGridWindow(QMainWindow):
def __init__(self, epg_data, channel_order, channel_video_info, main_window=None, channel_logos=None):
super().__init__()
self.setWindowTitle("Playlist with Electronic Programme Guide (EPG)")
control_height = 50 # or 60 for taller controls
# Accurate per-monitor aware screen geometry (with taskbar)
screen = QGuiApplication.primaryScreen()
available_rect = screen.availableGeometry() # usable area
# Snap to left half of usable screen
left = available_rect.left()-1
top = available_rect.top()
width = available_rect.width() // 2 - 3
# self.setFixedWidth(width)
height = available_rect.height() - control_height + 12 # subtract button/search bar height
self.setWindowFlags(self.windowFlags() | Qt.Window)
self.showNormal()
self.move(left, top)
self.resize(width, height)
self.epg_data = epg_data
self.channel_order = channel_order
self.channel_video_info = channel_video_info
self.main_window = main_window
self.full_channel_order = list(self.channel_order)
self.channel_column_width = 310
self.timeline_slot_width = 150
self.row_height = 50
self.timeline_row_height = 60
now = datetime.now(pytz.timezone("Asia/Karachi"))
# Align to :00 or :30
if now.minute < 30:
aligned_minutes = 0
else:
aligned_minutes = 30
aligned_now = now.replace(minute=aligned_minutes, second=0, microsecond=0)
# Shift timeline 15 minutes earlier so labels align with block dividers
self.timeline_start = aligned_now - timedelta(days=1, minutes=15)
self.timeline_end = aligned_now + timedelta(days=2)
self.timeline = []
t = self.timeline_start
while t <= self.timeline_end:
self.timeline.append(t)
t += timedelta(minutes=30)
self.timeline_end = aligned_now + timedelta(days=2)
self.timeline = []
t = self.timeline_start
while t <= self.timeline_end:
self.timeline.append(t)
t += timedelta(minutes=30)
self.timeline_scene = QGraphicsScene()
self.channel_scene = QGraphicsScene()
self.channel_logo_items = {} # ch_name โ QGraphicsPixmapItem
self.grid_scene = QGraphicsScene()
self.timeline_scene.installEventFilter(self)
self.channel_scene.installEventFilter(self)
self.grid_scene.installEventFilter(self)
self.timeline_view = QGraphicsView(self.timeline_scene)
self.timeline_view.setFixedHeight(self.timeline_row_height)
self.channel_view = QGraphicsView(self.channel_scene)
self.grid_view = QGraphicsView(self.grid_scene)
self.grid_view.setStyleSheet("border: none;")
self.timeline_view.setStyleSheet("border: none;")
self.channel_view.setStyleSheet("border-right: 1px solid #666;")
self.grid_view.horizontalScrollBar().valueChanged.connect(self.timeline_view.horizontalScrollBar().setValue)
self.grid_view.verticalScrollBar().valueChanged.connect(self.sync_vertical_scroll)
self.channel_view.verticalScrollBar().valueChanged.connect(self.sync_vertical_scroll)
self.timeline_view.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.timeline_view.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.channel_view.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
# self.channel_view.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.timeline_view.setStyleSheet("border-bottom: 2px solid #666;")
container = QWidget()
layout = QGridLayout(container)
layout.setSpacing(0)
layout.setContentsMargins(0, 0, 0, 0)
self.clock_label = QLabel()
self.clock_label.setAlignment(Qt.AlignCenter)
self.clock_label.setFixedSize(self.channel_column_width, self.timeline_row_height)
self.clock_label.setStyleSheet("""
background-color: #222;
color: white;
font-size: 10pt;
font-weight: bold;
border: none;
""")
layout.addWidget(self.clock_label, 0, 0)
layout.addWidget(self.timeline_view, 0, 1)
layout.addWidget(self.channel_view, 1, 0)
layout.addWidget(self.grid_view, 1, 1)
layout.setRowStretch(0, 0) # timeline
layout.setRowStretch(1, 1) # grid
control_layout = QHBoxLayout()
self.search_box = QLineEdit()
self.search_box.setPlaceholderText("Search channel...")
self.search_box.setFixedWidth(250)
self.search_box.setFixedHeight(control_height)
self.search_box.setStyleSheet("font-size: 14px; padding: 5px;")
self.search_box_timer = QTimer()
self.search_box_timer.setSingleShot(True)
self.search_box_timer.timeout.connect(lambda: self.filter_channels(self.search_box.text()))
self.search_box.textChanged.connect(lambda _: self.search_box_timer.start(200)) # 200ms delay
# Add the search box on the left
control_layout.addWidget(self.search_box)
self.clear_search_btn = QPushButton("โClear Search")
self.clear_search_btn.setFixedSize(105, control_height)
self.clear_search_btn.setToolTip("Clear search")
self.clear_search_btn.clicked.connect(lambda: self.search_box.setText(""))
control_layout.addWidget(self.clear_search_btn)
# Add stretch to separate it from buttons
control_layout.addStretch(1)
self.jump_now_btn = QPushButton("โฐ Jump to Now")
self.jump_now_btn.setFixedHeight(control_height)
self.jump_now_btn.clicked.connect(self.scroll_to_current_time)
control_layout.addWidget(self.jump_now_btn)
self.custom_epg_btn = QPushButton("๐ Add Custom EPG")
self.custom_epg_btn.setFixedHeight(control_height)
self.custom_epg_btn.clicked.connect(self.load_custom_epg)
control_layout.addWidget(self.custom_epg_btn)
self.update_epg_btn = QPushButton("๐ Update EPG/Logos")
self.update_epg_btn.setFixedHeight(control_height)
self.update_epg_btn.clicked.connect(self.update_all_epg_and_logos)
control_layout.addWidget(self.update_epg_btn)
self.clear_epg_btn = QPushButton("๐งน Clear EPG/Cache")
self.clear_epg_btn.setFixedHeight(control_height)
self.clear_epg_btn.clicked.connect(self.clear_epg)
control_layout.addWidget(self.clear_epg_btn)
layout.addLayout(control_layout, 2, 0, 1, 2, alignment=Qt.AlignLeft)
self.setCentralWidget(container)
self.build_timeline()
self.build_channels()
self.build_blank_program_grid() # ๐งฑ Placeholder grid (so white background disappears)
self.draw_now_line()
QTimer.singleShot(300, self.build_program_grid)
self.clock_timer = QTimer(self)
self.clock_timer.timeout.connect(self.update_clock)
self.clock_timer.start(1000)
self.now_line_timer = QTimer(self)
self.now_line_timer.timeout.connect(self.update_now_line)
self.now_line_timer.start(30000)
self.setup_now_autorefresh()
QTimer.singleShot(100, self.scroll_to_current_time)
def build_blank_program_grid(self):
self.grid_scene.clear()
total_rows = len(self.channel_order)
total_cols = len(self.timeline)
for row in range(total_rows):
for col in range(total_cols):
x = col * self.timeline_slot_width
y = row * self.row_height
w = self.timeline_slot_width
h = self.row_height - 1
rect = QGraphicsRectItem(x, y, w, h)
rect.setBrush(QBrush(QColor("#222"))) # darker placeholder
rect.setPen(QPen(QColor("#111")))
self.grid_scene.addItem(rect)
self.grid_scene.setSceneRect(0, 0, total_cols * self.timeline_slot_width, total_rows * self.row_height)
def build_channels(self):
print(f"[EPGGrid] Building {len(self.channel_order)} channel rows")
self.channel_scene.clear()
self.channel_logo_items.clear()
for row, (ch_name, ch_id) in enumerate(self.channel_order):
y = row * self.row_height
# Attempt to find original index (channel number) from full_channel_order
try:
original_index = self.full_channel_order.index((ch_name, ch_id))
except ValueError:
original_index = row # fallback
# Background rectangle for row
rect = QGraphicsRectItem(0, y, self.channel_column_width, self.row_height)
rect.setBrush(QBrush(QColor("#444") if row % 2 == 0 else QColor("#333")))
rect.setPen(QPen(QColor("#222")))
rect.setAcceptHoverEvents(True)
rect.setAcceptedMouseButtons(Qt.LeftButton) # โ
required for double-click
rect.setFlag(QGraphicsRectItem.ItemIsSelectable, True)
rect.setData(0, row) # visual row index
rect.setData(1, original_index) # โ
channel number (for playback mapping)
self.channel_scene.addItem(rect)
gap_after_num = 8
gap_after_logo = 8
logo_width = 40
logo_height = 30
logo_area_width = logo_width
logo_area_height = self.row_height
logo_x = 10
logo_y = y
# Position channel name near top line (e.g. 6 pixels from top)
text_y = y + 4
logo_url = self.main_window.current_playlist.logos.get(ch_name) if self.main_window.current_playlist else None
logo_pixmap = load_logo(logo_url) if logo_url else None
if logo_pixmap:
scaled_pixmap = logo_pixmap.scaled(logo_width, logo_height, Qt.KeepAspectRatio, Qt.SmoothTransformation)
logo_item = QGraphicsPixmapItem(scaled_pixmap)
offset_x = logo_x + (logo_area_width - scaled_pixmap.width()) / 2
offset_y = logo_y + (logo_area_height - scaled_pixmap.height()) / 2
logo_item.setPos(offset_x, offset_y)
self.channel_scene.addItem(logo_item)
# โ
Store reference for real-time updates
self.channel_logo_items[ch_name] = logo_item
logo_end_x = logo_x + logo_area_width + gap_after_logo
else:
logo_end_x = logo_x
# Channel number
num_item = QGraphicsSimpleTextItem(f"{original_index + 1}.")
num_item.setFont(QFont("Arial", 10, QFont.Bold))
num_item.setBrush(QColor("white"))
num_item.setPos(logo_end_x, text_y)
self.channel_scene.addItem(num_item)
# Channel name (shifted up)
num_end_x = logo_end_x + num_item.boundingRect().width() + gap_after_num
name_item = QGraphicsSimpleTextItem(ch_name)
name_item.setFont(QFont("Arial", 10, QFont.Bold))
name_item.setBrush(QColor("white"))
name_item.setPos(num_end_x, text_y)
self.channel_scene.addItem(name_item)
# FPS and resolution (bottom right)
res, fps = self.channel_video_info.get(ch_name, ("NA", "NA"))
info_item = QGraphicsSimpleTextItem(f"({res}, {fps} FPS)")
info_item.setFont(QFont("Arial", 9))
info_item.setBrush(QColor("#CCCCCC"))
info_width = info_item.boundingRect().width()
info_item.setPos(self.channel_column_width - info_width - 10, y + self.row_height / 2 + 2)
self.channel_scene.addItem(info_item)
def build_program_grid(self):
local_tz = pytz.timezone("Asia/Karachi")
for row, (ch_name, ch_id) in enumerate(self.channel_order):
programs = [
p for p in EPGStore.get().findall(".//programme")
if p.get("channel") == ch_id
]
# Convert to (start, stop, title)
blocks = []
for prog in programs:
try:
start = datetime.strptime(prog.get("start")[:14], "%Y%m%d%H%M%S").replace(tzinfo=pytz.utc).astimezone(local_tz)
stop = datetime.strptime(prog.get("stop")[:14], "%Y%m%d%H%M%S").replace(tzinfo=pytz.utc).astimezone(local_tz)
if stop <= self.timeline_start or start >= self.timeline_end:
continue
raw_title = prog.findtext("title", "No Title")
title = self.decode_epg_text(raw_title)
blocks.append((start, stop, title))
except Exception as e:
print(f"[ERROR] Parsing program failed: {e}")
blocks.sort(key=lambda x: x[0])
current_time = self.timeline_start
for start, stop, title in blocks:
# Fill any gap before this program
if start > current_time:
gap_start = current_time
gap_end = start
while gap_start < gap_end:
next_gap = min(gap_end, gap_start + timedelta(minutes=60))
start_offset = int((gap_start - self.timeline_start).total_seconds() // 1800)
duration = int((next_gap - gap_start).total_seconds() // 1800)
self.draw_program_block(row, start_offset, duration, "No Program Information")
gap_start = next_gap
# Draw real program
start_offset = max(0, int((start - self.timeline_start).total_seconds() // 1800))
end_offset = min(len(self.timeline), int((stop - self.timeline_start).total_seconds() // 1800))
duration = max(1, end_offset - start_offset)
self.draw_program_block(row, start_offset, duration, title)
current_time = stop
# Fill gap after last program to timeline_end
if current_time < self.timeline_end:
gap_start = current_time
gap_end = self.timeline_end
while gap_start < gap_end:
next_gap = min(gap_end, gap_start + timedelta(minutes=60))
start_offset = int((gap_start - self.timeline_start).total_seconds() // 1800)
duration = int((next_gap - gap_start).total_seconds() // 1800)
self.draw_program_block(row, start_offset, duration, "No Program Information")
gap_start = next_gap
def build_timeline(self):
shift_px = self.timeline_slot_width // 2 # Shift timeline 15 mins earlier visually
for index, t in enumerate(self.timeline):
x = index * self.timeline_slot_width - shift_px
# Draw background block (as before)
rect = QGraphicsRectItem(x, 0, self.timeline_slot_width, self.timeline_row_height)
rect.setBrush(QBrush(QColor("#333")))
rect.setPen(QPen(QColor("#333")))
self.timeline_scene.addItem(rect)
# ๐ Instead of showing t (start), label with t+30min (end of slot)
label_time = t + timedelta(minutes=15)
date_str = label_time.strftime("%a %b %d") # e.g., "Fri Jun 07"
time_str = label_time.strftime("%I:%M %p").lstrip("0") # e.g., "3:30 PM"
label = QGraphicsTextItem()
label.setDefaultTextColor(QColor("white"))
label.setHtml(f"""
{date_str}
{time_str}
""")
text_rect = label.boundingRect()
label.setPos(
x + (self.timeline_slot_width - text_rect.width()) / 2,
(self.timeline_row_height - text_rect.height()) / 2
)
self.timeline_scene.addItem(label)
def channel_double_click_handler(self, graphics_item):
row = graphics_item.data(0)
if row is not None and self.main_window:
if 0 <= row < self.main_window.channel_list.count():
channel_number = graphics_item.data(1)
if channel_number is not None and self.main_window:
for i in range(self.main_window.channel_list.count()):
channel_item = self.main_window.channel_list.item(i)
if channel_item.data(Qt.UserRole + 2) == channel_number:
self.main_window.channel_list.setCurrentItem(channel_item)
self.main_window.on_channel_clicked(channel_item)
break
def clear_epg(self):
EPGStore.clear()
# Reset view with dummy-only blocks
self.main_window.insert_dummy_epg_gaps()
self.update_epg(self.channel_order, self.channel_video_info)
print("[EPG] Cleared to dummy state")
# Clear logo cache folder
try:
import shutil
shutil.rmtree(CACHE_DIR)
os.makedirs(CACHE_DIR, exist_ok=True)
print("[CACHE] Logo cache cleared.")
except Exception as e:
print(f"[CACHE] Failed to clear logo cache: {e}")
# Clear custom EPG source list
self.main_window.custom_epg_sources = []
# Delete saved EPG cache file
epg_cache_file = os.path.join(self.main_window.php_dir, "custom_epg_sources.txt")
if os.path.exists(epg_cache_file):
try:
os.remove(epg_cache_file)
print("[CACHE] Custom EPG source cache cleared.")
except Exception as e:
print(f"[CACHE] Failed to delete EPG cache file: {e}")
self.main_window.channel_logos.clear()
self.refresh_channel_logos()
self.main_window.current_logo_pixmap = None
self.main_window.update_epg_info(self.main_window.current_channel_name, self.main_window.current_channel_resolution)
def decode_epg_text(self, text):
if not text:
return ""
return unquote(text)
def draw_now_line(self):
self.now_line = QGraphicsLineItem()
self.now_line.setPen(QPen(QColor("red"), 2))
self.grid_scene.addItem(self.now_line)
self.update_now_line()
def draw_program_block(self, row, col_start, duration, title):
x = col_start * self.timeline_slot_width
y = row * self.row_height
w = duration * self.timeline_slot_width
h = self.row_height - 1
bg_color = "#444" if row % 2 == 0 else "#333"
rect = QGraphicsRectItem(x, y, w, h)
rect.setBrush(QBrush(QColor(bg_color)))
rect.setPen(QPen(QColor("#222")))
rect.setAcceptHoverEvents(True)
rect.setAcceptedMouseButtons(Qt.LeftButton)
rect.setData(0, row)
rect.setData(1, self.full_channel_order.index(self.channel_order[row]))
self.grid_scene.addItem(rect)
rect.channel_index = row
repeat_width = self.timeline_slot_width * 2 # 1 hour = 2 x 30min slots
if not hasattr(self, "_scroll_timers"):
self._scroll_timers = []
# Check if program ends within 30 mins and is longer than 1 hour
now = datetime.now(pytz.timezone("Asia/Karachi"))
start_time = self.timeline_start + timedelta(minutes=30 * col_start)
stop_time = self.timeline_start + timedelta(minutes=30 * (col_start + duration))
# Only apply right-alignment if:
# 1. Duration >= 2 (i.e. > 30 mins)
# 2. Program ENDS within next 30 mins
# 3. Program STARTED more than 30 mins ago
starts_within_30_mins = (now - start_time).total_seconds() <= 1800
ends_within_30_mins = (stop_time - now).total_seconds() <= 1800
ends_soon = (
duration == 2 and
ends_within_30_mins and
not starts_within_30_mins # โ
program must NOT have started recently
)
def create_text_block(tx, tw, align="center"):
container = QGraphicsRectItem(tx, y, tw, h)
container.setPen(QPen(Qt.NoPen))
container.setBrush(QBrush(Qt.NoBrush))
container.setFlag(QGraphicsItem.ItemClipsChildrenToShape, True)
self.grid_scene.addItem(container)
text_item = QGraphicsTextItem(container)
text_item.setDefaultTextColor(QColor("white"))
font = QFont("Arial", 9)
text_item.setFont(font)
text_item.setTextWidth(tw)
html = f"""
{title.replace('\n', '
')}
"""
text_item.setHtml(html)
text_rect = text_item.boundingRect()
# Horizontal offset
if align == "left":
offset_x = 5
elif align == "right":
offset_x = max(0, tw - text_rect.width() - 5)
else: # center
offset_x = max(5, (tw - text_rect.width()) / 2)
# Vertical offset
if text_rect.height() <= h:
offset_y = y + (h - text_rect.height()) / 2
else:
offset_y = y
text_item.setPos(tx + offset_x, offset_y)
if text_rect.height() > h:
scroll_offset = text_rect.height() - h
scroll_speed = 10
interval_ms = 50
pixels_per_tick = scroll_speed * (interval_ms / 1000)
scroll_state = {
"base_x": tx + offset_x,
"y_pos": offset_y,
"max_offset": offset_y - scroll_offset,
"waiting": False,
"direction": -1
}
def scroll_text(text_item=text_item, state=scroll_state):
if state["waiting"]:
return
state["y_pos"] += state["direction"] * pixels_per_tick
if state["direction"] == -1 and state["y_pos"] < state["max_offset"]:
state["y_pos"] = state["max_offset"]
state["waiting"] = True
QTimer.singleShot(2000, lambda: change_direction(1, state))
elif state["direction"] == 1 and state["y_pos"] > offset_y:
state["y_pos"] = offset_y
state["waiting"] = True
QTimer.singleShot(2000, lambda: change_direction(-1, state))
text_item.setPos(state["base_x"], state["y_pos"])
def change_direction(new_direction, state):
state["direction"] = new_direction
state["waiting"] = False
timer = QTimer()
timer.timeout.connect(lambda: scroll_text())
timer.start(interval_ms)
def on_text_item_destroyed():
timer.stop()
text_item.destroyed.connect(on_text_item_destroyed)
self._scroll_timers.append(timer)
if ends_soon:
# Just one right-aligned title at end
create_text_block(x + w - repeat_width, repeat_width, align="right")
else:
# Repeat title every hour
num_repeats = max(1, int(w // repeat_width))
for i in range(num_repeats):
tx = x + i * repeat_width
tw = min(repeat_width, x + w - tx)
create_text_block(tx, tw, align="center")
def eventFilter(self, watched, event):
if event.type() in [QEvent.GraphicsSceneHoverEnter, QEvent.GraphicsSceneHoverLeave, QEvent.GraphicsSceneMouseDoubleClick]:
if hasattr(event, "scenePos"):
pos = event.scenePos()
items = watched.items(pos)
for item in items:
if isinstance(item, QGraphicsRectItem):
row = item.data(0)
if event.type() == QEvent.GraphicsSceneHoverEnter:
item.setBrush(QBrush(QColor("#555")))
return True
elif event.type() == QEvent.GraphicsSceneHoverLeave:
color = "#444" if row % 2 == 0 else "#333"
item.setBrush(QBrush(QColor(color)))
return True
elif event.type() == QEvent.GraphicsSceneMouseDoubleClick:
channel_number = item.data(1)
if channel_number is not None and self.main_window:
if self.main_window.current_playlist:
try:
ch_name, ch_id = self.main_window.current_playlist.channels[channel_number]
for i in range(self.main_window.channel_list.count()):
item = self.main_window.channel_list.item(i)
if item.text().startswith(ch_name):
self.main_window.channel_list.setCurrentItem(item)
self.main_window.on_channel_clicked(item)
break
except IndexError:
pass
return True
return False
def filter_channels(self, text):
text = text.strip().lower()
# Filter the full list
if text == "":
self.channel_order = self.full_channel_order # Reset to full list
else:
self.channel_order = [
ch for ch in self.full_channel_order if text in ch[0].lower()
]
# Rebuild only visible rows
self.channel_scene.clear()
self.grid_scene.clear()
self.build_channels()
self.build_program_grid()
self.draw_now_line()
# Reset scroll to top so filtered result starts at top
self.channel_view.verticalScrollBar().setValue(0)
self.grid_view.verticalScrollBar().setValue(0)
def load_custom_epg(self):
dialog = QDialog(self)
dialog.setWindowTitle("Add Custom EPG Source")
layout = QVBoxLayout(dialog)
label = QLabel("Enter EPG URL or click Browse to select a file (.xml or .gz):")
layout.addWidget(label)
h_layout = QHBoxLayout()
url_input = QLineEdit()
browse_btn = QPushButton("Browse")
h_layout.addWidget(url_input)
h_layout.addWidget(browse_btn)
layout.addLayout(h_layout)
format_hint = QLabel("Accepted formats: .xml, .gz")
format_hint.setStyleSheet("color: gray; font-size: 10pt;")
layout.addWidget(format_hint)
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
layout.addWidget(button_box)
def on_browse():
file_path, _ = QFileDialog.getOpenFileName(self, "Select EPG File", "", "EPG Files (*.xml *.gz)")
if file_path:
url_input.setText(file_path)
browse_btn.clicked.connect(on_browse)
def on_accept():
source = url_input.text().strip()
if source:
if source not in self.main_window.custom_epg_sources:
self.main_window.custom_epg_sources.append(source)
self.main_window.log_debug(f"Added custom EPG source: {source}")
# Load only this newly added source without clearing EPGStore
try:
if source.startswith("http://") or source.startswith("https://"):
response = requests.get(source, timeout=10)
response.raise_for_status()
try:
xml_data = gzip.decompress(response.content).decode('utf-8')
except:
xml_data = response.content.decode('utf-8')
else:
if source.endswith(".gz"):
with gzip.open(source, 'rb') as f:
xml_data = f.read().decode('utf-8')
else:
with open(source, 'r', encoding='utf-8') as f:
xml_data = f.read()
root = ET.fromstring(xml_data)
for programme in root.findall("programme"):
EPGStore.append(programme)
self.main_window.insert_dummy_epg_gaps()
self.main_window.update_current_epg()
self.main_window.update_epg_info(self.main_window.current_channel_name, self.main_window.current_channel_resolution)
self.update_epg(self.channel_order, self.channel_video_info)
except Exception as e:
self.main_window.log_debug(f"[Custom EPG] Failed to load from {source}: {e}")
epg_cache_file = os.path.join(self.main_window.php_dir, "custom_epg_sources.txt")
try:
with open(epg_cache_file, "a", encoding="utf-8") as f:
f.write(source + "\n")
except Exception as e:
print(f"[CACHE] Failed to save custom EPG source: {e}")
dialog.accept()
button_box.accepted.connect(on_accept)
button_box.rejected.connect(dialog.reject)
dialog.exec_()
def refresh_channel_fps_info(self):
self.channel_scene.clear()
self.build_channels()
def refresh_channel_logos(self):
self.channel_scene.clear()
self.build_channels()
def scroll_to_current_time(self):
now = datetime.now(pytz.timezone("Asia/Karachi"))
now += timedelta(minutes=30) # match shifted timeline
# Snap to start of current 30-minute block
minute = now.minute
aligned_minute = 0 if minute < 30 else 30
aligned_now = now.replace(minute=aligned_minute, second=0, microsecond=0)
# โ
Calculate X coordinate
minutes_since_start = (aligned_now - self.timeline_start).total_seconds() / 60
x = (minutes_since_start / 30)* self.timeline_slot_width
scroll_x = int(x - self.channel_column_width) +40
self.grid_view.horizontalScrollBar().setValue(max(scroll_x, 0))
def setup_now_autorefresh(self):
now = datetime.now()
# Align to next :00 or :30
if now.minute < 30:
next_run = now.replace(minute=30, second=0, microsecond=0)
else:
next_run = now.replace(hour=now.hour + 1 if now.hour < 23 else 0, minute=0, second=0, microsecond=0)
delay_ms = max(0, int((next_run - now).total_seconds() * 1000))
QTimer.singleShot(delay_ms, self.start_now_timer)
def start_now_timer(self):
self.trigger_jump_to_now()
self._now_timer = QTimer()
self._now_timer.timeout.connect(self.trigger_jump_to_now)
self._now_timer.start(30 * 60 * 1000) # run every 30 mins *after* correct alignment
def sync_vertical_scroll(self, value):
self.channel_view.verticalScrollBar().setValue(value)
self.grid_view.verticalScrollBar().setValue(value)
def trigger_jump_to_now(self):
print("[EPG Grid] Auto-jumping to current time (on clock boundary)")
self.scroll_to_current_time()
def update_all_epg_sources(self):
if not self.main_window:
print("[EPG] Main window reference is missing.")
return
print("[EPG] Refreshing all default and custom EPG sources...")
# Only clear if we know we have EPG sources to re-download
if self.main_window.default_epg_urls or self.main_window.custom_epg_sources:
EPGStore.clear()
# Download and merge all default EPGs
for url in self.main_window.default_epg_urls:
print(f"[EPG] Downloading default EPG: {url}")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
try:
xml_data = gzip.decompress(response.content).decode('utf-8')
except:
xml_data = response.content.decode('utf-8')
root = ET.fromstring(xml_data)
for programme in root.findall("programme"):
EPGStore.append(programme)
except Exception as e:
print(f"[EPG] Failed to download from {url}: {e}")
# Load all custom EPGs (local files or URLs)
for source in self.main_window.custom_epg_sources:
print(f"[Custom EPG] Loading: {source}")
try:
if source.startswith("http://") or source.startswith("https://"):
response = requests.get(source, timeout=10)
response.raise_for_status()
try:
xml_data = gzip.decompress(response.content).decode('utf-8')
except:
xml_data = response.content.decode('utf-8')
else:
if source.endswith(".gz"):
with gzip.open(source, 'rb') as f:
xml_data = f.read().decode('utf-8')
else:
with open(source, 'r', encoding='utf-8') as f:
xml_data = f.read()
root = ET.fromstring(xml_data)
for programme in root.findall("programme"):
EPGStore.append(programme)
except Exception as e:
print(f"[Custom EPG] Failed to load from {source}: {e}")
self.main_window.epg_data = EPGStore.get()
# Fill gaps and update UI
self.main_window.insert_dummy_epg_gaps()
self.main_window.update_current_epg()
self.main_window.update_epg_info(self.main_window.current_channel_name, self.main_window.current_channel_resolution)
self.update_epg(self.channel_order, self.channel_video_info)
total_programs = len(EPGStore.get().findall(".//programme"))
print(f"[EPG] Update complete. Total programs: {total_programs}")
self.main_window.log_debug(f"EPG updated: {total_programs} entries")
def update_all_epg_and_logos(self):
self.update_all_epg_sources()
# โ
Rebuild channel_logos from channel list
self.main_window.channel_logos.clear()
for i in range(self.main_window.channel_list.count()):
item = self.main_window.channel_list.item(i)
base = item.text().split("(")[0].strip()
url = item.data(Qt.UserRole + 3)
if url:
self.main_window.channel_logos[base] = url
# โ
Start downloading logos
if hasattr(self.main_window, 'start_logo_prefetch'):
self.main_window.start_logo_prefetch()
def update_clock(self):
now = datetime.now(pytz.timezone("Asia/Karachi"))
self.clock_label.setText(now.strftime("%a %b %d %I:%M:%S %p"))
def update_now_line(self):
now = datetime.now(pytz.timezone("Asia/Karachi"))
now -= timedelta(minutes=15) # โ
align red line with program grid
if self.timeline_start <= now <= self.timeline_end:
minutes_since_start = (now - self.timeline_start).total_seconds() / 60
x = (minutes_since_start / 30) * self.timeline_slot_width
self.now_line.setLine(x, 0, x, len(self.channel_order) * self.row_height)
self.now_line.show()
else:
self.now_line.hide()
def update_epg(self, channel_order, channel_video_info):
print(f"[EPGGrid] update_epg() called with {len(channel_order)} channels")
self.channel_order = channel_order
self.full_channel_order = list(channel_order) # ๐ง Ensure it's up to date for search
self.channel_video_info = channel_video_info
# โ
Stop timers first
if hasattr(self, "_scroll_timers"):
for t in self._scroll_timers:
t.stop()
self._scroll_timers.clear()
self.timeline_scene.clear()
self.channel_scene.clear()
self.grid_scene.clear()
self.build_timeline()
self.build_channels()
self.build_program_grid()
self.draw_now_line()
# โ
Force both scenes and views to match height exactly
total_rows = len(self.channel_order)
scene_height = total_rows * self.row_height
scene_width = len(self.timeline) * self.timeline_slot_width
self.grid_scene.setSceneRect(0, 0, scene_width, scene_height)
self.channel_scene.setSceneRect(0, 0, self.channel_column_width, scene_height)
self.grid_view.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.channel_view.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
class EPGStore:
_tree = ET.Element("tv") # โ
Initialize with an empty root
@classmethod
def set(cls, tree):
cls._tree = tree if tree is not None else ET.Element("tv")
@classmethod
def get(cls):
return cls._tree
@classmethod
def append(cls, programme):
if cls._tree is not None:
cls._tree.append(programme)
@classmethod
def clear(cls):
cls._tree = ET.Element("tv")
class IPTVWindow(QMainWindow):
def check_login(self):
creds = AuthManager.load_credentials()
if not creds:
dlg = LoginDialog(self)
if dlg.exec_() != QDialog.Accepted:
print("[INFO] User cancelled login. Running without playlists.")
return
else:
auth = AuthManager.authenticate(creds['username'], creds['password'])
if auth.get("status") != "ok":
QMessageBox.critical(self, "Login Failed", auth.get("message", "Authentication error"))
AuthManager.clear_credentials()
dlg = LoginDialog(self)
if dlg.exec_() != QDialog.Accepted:
print("[INFO] User cancelled login. Running without playlists.")
return
# If we're here, login succeeded
# TODO: download and load allowed playlists
print("[INFO] Login successful")
def __init__(self, php_dir):
self.channel_video_info = {}
self.epg_grid_window = None
super().__init__()
self.setWindowTitle("DSPT IPTV Player")
self.setGeometry(100, 100, 1280, 720)
self.php_dir = php_dir
self._is_fullscreen = False
self._normal_geometry = None
self._normal_flags = None
self.current_channel_epg = None
self.epg_last_update = None
self.debug_log = []
self.playlists = []
self.current_playlist = None
self.logo_manager = None
self.default_epg_urls = [] # Holds default EPG URLs from playlist(s)
self.custom_epg_sources = [] # Holds both custom EPG files and URLs
self.channel_order = [] # โ
Prevent AttributeError
self.full_channel_order = list(self.channel_order)
self.init_ui()
# Get usable screen area
screen = QGuiApplication.primaryScreen().availableGeometry()
screen_left = screen.left()
screen_top = screen.top()
screen_width = screen.width()
screen_height = screen.height()
# Dimensions for top-right quadrant
window_width = screen_width // 2
window_height = screen_height // 2
# Shift down a bit to show title bar and avoid EPG overlap
safe_margin = 40 # ensures title bar is visible
window_top = screen_top + safe_margin
window_left = screen_left + window_width # right half
# Slightly reduce height to prevent overlapping EPGGrid
adjusted_height = screen_height // 2 - safe_margin
self.setGeometry(window_left, window_top, window_width, adjusted_height)
self.epg_data = ET.Element("tv") # placeholder in case it's accessed early
self.show() # โ
Show Main Window Immediately
# STEP 2: Show blank EPG Grid window (structure only)
self.epg_grid_window = EPGGridWindow(ET.Element("tv"), [], {}, self)
self.epg_grid_window.show()
self.stream_error_label = None # for "Stream not available" overlay
def init_ui(self):
self.splitter = QSplitter(Qt.Horizontal)
# Left panel with channels
self.channel_list = QListWidget()
self.channel_list.itemClicked.connect(self.on_channel_clicked)
self.channel_list.setVisible(False) # hide it always
# Right panel with video and EPG
right_panel = QWidget()
layout = QVBoxLayout(right_panel)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
self.video_container = QWidget()
self.video_container.setStyleSheet("background-color: black;")
self.video_container.setMinimumHeight(400)
video_container_layout = QVBoxLayout(self.video_container)
video_container_layout.setContentsMargins(0, 0, 0, 0)
self.video_frame = VideoFrame(self.video_container)
self.video_frame.mouseDoubleClickEvent = self.toggle_fullscreen
video_container_layout.addWidget(self.video_frame)
layout.addWidget(self.video_container, stretch=3)
self.player = VLCPlayer(self.video_frame)
menu_bar = self.menuBar()
file_menu = self.menuBar().addMenu("File")
open_file_action = QAction("Open Playlist", self)
open_file_action.triggered.connect(self.open_local_playlist)
file_menu.addAction(open_file_action)
open_stream_action = QAction("Open Network Stream", self)
open_stream_action.triggered.connect(self.open_network_stream)
file_menu.addAction(open_stream_action)
account_menu = menu_bar.addMenu("Account")
edit_action = QAction("Add / Edit Account", self)
edit_action.triggered.connect(self.edit_account)
account_menu.addAction(edit_action)
logout_action = QAction("Log Out", self)
logout_action.triggered.connect(self.logout_account)
account_menu.addAction(logout_action)
# EPG Display with new formatting
self.epg_display = QLabel()
self.epg_display.setStyleSheet("""
background-color: #222;
color: white;
padding: 10px;
""")
self.epg_display.setWordWrap(True)
self.epg_display.setAlignment(Qt.AlignTop | Qt.AlignLeft)
layout.addWidget(self.epg_display, stretch=1)
# Timers
self.clock_timer = QTimer()
self.clock_timer.timeout.connect(self.refresh_epg_time)
self.clock_timer.start(1000)
self.epg_update_timer = QTimer()
self.epg_update_timer.timeout.connect(self.update_epg_data)
self.epg_update_timer.start(3600000) # Update EPG every hour
# Controls
self.controls = QWidget()
ctl_layout = QHBoxLayout(self.controls)
for text, slot in [("โฎ๏ธ", self.play_previous_channel),
("โฏ๏ธ", self.toggle_play_pause),
("โญ๏ธ", self.play_next_channel),
("๐ต", self.toggle_fullscreen),
("๐
", self.toggle_epg_grid),
("๐", self.update_epg_data)]:
btn = QPushButton(text)
btn.clicked.connect(slot)
ctl_layout.addWidget(btn)
self.volume_slider = QSlider(Qt.Horizontal)
self.volume_slider.setRange(0, 200)
self.volume_slider.setValue(70)
self.volume_slider.setToolTip("Volume")
self.volume_slider.valueChanged.connect(self.set_volume)
ctl_layout.addWidget(self.volume_slider)
self.mute_button = QPushButton("๐")
self.mute_button.setCheckable(True)
self.mute_button.clicked.connect(self.toggle_mute)
ctl_layout.addWidget(self.mute_button)
layout.addWidget(self.controls)
self.splitter.addWidget(right_panel)
self.splitter.setSizes([300, 980])
self.setCentralWidget(self.splitter)
self.last_epg_title = ""
self.last_epg_desc = ""
QTimer.singleShot(500, self.check_login)
def build_playlist_menu(self):
playlist_menu = self.menuBar().addMenu("Playlist")
for url in self.user_playlists:
name = os.path.basename(url)
action = QAction(name, self)
action.triggered.connect(lambda _, u=url: self.load_playlist_from_url(u))
playlist_menu.addAction(action)
def cleanup_resources(self):
self.log_debug("Cleaning up timers and resources")
if self.epg_grid_window:
self.epg_grid_window.close()
self.epg_grid_window.deleteLater()
self.epg_grid_window = None
self.clock_timer.stop()
self.epg_update_timer.stop()
self.player.stop()
self.player.release()
global php_proc
if php_proc:
try:
php_proc.terminate()
php_proc.wait(2)
except Exception as e:
self.log_debug(f"[PHP Cleanup] Error: {e}")
php_proc = None
self.log_debug("Cleanup finished โ exiting app")
QTimer.singleShot(100, QApplication.instance().quit) # exit only after UI fully cleaned
def closeEvent(self, event):
self.log_debug("Application closing - starting async cleanup")
# Hide the main window immediately for fast visual exit
self.hide()
# Defer cleanup after 100ms so UI can close first
QTimer.singleShot(100, self.cleanup_resources)
event.accept() # โ
allow close to continue
super().closeEvent(event)
def detect_resolution(self, item, base):
if not self.player.media_player.is_playing():
print("[VLC] Stream failed โ displaying fallback message")
self.player.media_player.stop()
# Remove any previous error label
if self.stream_error_label:
self.stream_error_label.deleteLater()
self.stream_error_label = None
# Show persistent "Stream not available" message
self.stream_error_label = QLabel("Stream is not available", self.video_frame)
self.stream_error_label.setAlignment(Qt.AlignCenter)
self.stream_error_label.setStyleSheet("""
color: red;
font-size: 18pt;
background-color: rgba(0, 0, 0, 180);
""")
self.stream_error_label.setGeometry(0, 0, self.video_frame.width(), self.video_frame.height())
self.stream_error_label.show()
# โ
Restart PHP only if needed and it's a PHP-based stream
url = item.data(Qt.UserRole)
if url and url.startswith("http://localhost:8888"):
if php_proc is None or php_proc.poll() is not None:
print("[PHP] Detected PHP server is not running. Restarting...")
restart_php_server(self.php_dir)
else:
print("[PHP] Forcing restart due to stuck stream...")
restart_php_server(self.php_dir)
self.current_channel_resolution = "Stream not available"
self.update_epg_info(self.current_channel_name or base, "Stream not available")
return
# โ
Stream is good โ remove error label
if self.stream_error_label:
self.stream_error_label.deleteLater()
self.stream_error_label = None
# Continue with resolution and FPS
w = self.player.media_player.video_get_width()
h = self.player.media_player.video_get_height()
fps = self.player.media_player.get_fps()
if w > 0 and h > 0:
label = "SD"
if w >= 3840:
label = "4K"
elif w >= 1920:
label = "FHD"
elif w >= 1280:
label = "HD"
info = f"{label}-{fps:.2f} FPS"
self.channel_video_info[base] = (label, f"{fps:.2f}")
item.setText(f"{base} ({info})")
self.current_channel_name = base
self.current_channel_resolution = info
self.update_epg_info(base, info)
if self.epg_grid_window:
self.epg_grid_window.refresh_channel_fps_info()
def download_playlist(self):
try:
os.makedirs(self.php_dir, exist_ok=True)
url = "https://iptv.nywebforum.com/playlist.m3u"
self.log_debug(f"Downloading playlist from: {url}")
response = requests.get(url, timeout=10)
lines = response.text.splitlines()
fixed_lines = []
for line in lines:
if line.startswith("http") and "iptv.nywebforum.com" in line:
line = line.replace("https://iptv.nywebforum.com", "http://localhost:8888")
fixed_lines.append(line)
content = "\n".join(fixed_lines)
# Save modified playlist (with local proxy)
path = os.path.join(self.php_dir, "playlist.m3u")
with open(path, "w", encoding="utf-8") as f:
f.write(content)
self.log_debug(f"Playlist saved to: {path}")
# Load into app
# self.load_playlist(path)
# Step 4: Fill dummy EPG data
self.insert_dummy_epg_gaps()
if self.epg_grid_window and self.current_playlist:
# Defer overlay creation safely inside update_epg()
self.epg_grid_window.update_epg(
self.current_playlist.channels,
self.current_playlist.video_info
)
# STEP 5: Start internal PHP server in background
threading.Thread(target=lambda: restart_php_server(self.php_dir), daemon=True).start()
# Step 6: Start playing first channel immediately
if self.channel_list.count() > 0:
first = self.channel_list.item(0)
self.channel_list.setCurrentItem(first)
QTimer.singleShot(500, lambda: self.on_channel_clicked(first))
# Step 7: Extract EPG URLs from first line if present
first_line = response.text.split('\n')[0]
if first_line.startswith("#EXTM3U") and "url-tvg=" in first_line:
epg_raw = first_line.split('url-tvg="')[1].split('"')[0]
epg_urls = [url.strip() for url in epg_raw.split(',') if url.strip()]
new_urls = [url for url in epg_urls if url not in self.default_epg_urls]
self.default_epg_urls.extend(new_urls)
if epg_urls:
self.log_debug(f"Found EPG URLs: {epg_urls}")
QTimer.singleShot(2000, self.update_epg_data) # Delayed fetch in background
# Delay logo prefetching so it runs AFTER first channel is loaded
QTimer.singleShot(4000, self.start_logo_prefetch)
except Exception as e:
self.log_debug(f"Failed to download playlist: {e}")
def edit_account(self):
creds = AuthManager.load_credentials() or {}
dlg = LoginDialog(self, creds.get("username", ""), creds.get("password", ""))
if dlg.exec_() == QDialog.Accepted:
QMessageBox.information(self, "Account Updated", "Credentials updated successfully.")
def insert_dummy_epg_gaps(self):
if self.epg_data is None:
return
local_tz = pytz.timezone("Asia/Karachi")
timeline_start = self.epg_grid_window.timeline_start if self.epg_grid_window else datetime.now(local_tz)
timeline_end = self.epg_grid_window.timeline_end if self.epg_grid_window else timeline_start + timedelta(hours=6)
for _, ch_id in self.channel_order:
progs = [p for p in self.epg_data.findall(".//programme") if p.get("channel") == ch_id]
# Convert to (start, stop, element)
parsed = []
for p in progs:
try:
start = datetime.strptime(p.get("start")[:14], "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc).astimezone(local_tz)
stop = datetime.strptime(p.get("stop")[:14], "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc).astimezone(local_tz)
parsed.append((start, stop, p))
except:
continue
parsed.sort(key=lambda x: x[0])
new_programmes = []
current_time = timeline_start
for start, stop, orig_elem in parsed:
if start > current_time:
# Insert dummy gap
gap_start = current_time
gap_end = start
while gap_start < gap_end:
next_gap = min(gap_end, gap_start + timedelta(hours=1))
dummy = ET.Element("programme")
dummy.set("channel", ch_id)
dummy.set("start", gap_start.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") + " +0000")
dummy.set("stop", next_gap.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") + " +0000")
title = ET.SubElement(dummy, "title")
title.text = "No Program Information"
new_programmes.append(dummy)
gap_start = next_gap
new_programmes.append(orig_elem)
current_time = max(current_time, stop)
# Fill any remaining gap to timeline_end
if current_time < timeline_end:
gap_start = current_time
while gap_start < timeline_end:
next_gap = min(timeline_end, gap_start + timedelta(hours=1))
dummy = ET.Element("programme")
dummy.set("channel", ch_id)
dummy.set("start", gap_start.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") + " +0000")
dummy.set("stop", next_gap.astimezone(timezone.utc).strftime("%Y%m%d%H%M%S") + " +0000")
title = ET.SubElement(dummy, "title")
title.text = "No Program Information"
new_programmes.append(dummy)
gap_start = next_gap
# Remove old and insert new
for p in progs:
self.epg_data.remove(p)
for p in new_programmes:
self.epg_data.append(p)
def keyPressEvent(self, event):
if event.key() == Qt.Key_Escape and self._is_fullscreen:
self.toggle_fullscreen()
else:
super().keyPressEvent(event)
def load_playlist(self, path):
playlist = PlaylistModel(name="Default", source_url=path)
try:
self.log_debug(f"Loading playlist from: {path}")
lines = open(path, encoding="utf-8").read().splitlines()
self.channel_list.clear()
first = None
name = None
channel_id = None
self.channel_order = [] # โ
Reset and initialize channel order
for i, l in enumerate(lines):
if l.startswith("#EXTINF:"):
name = l.split(',')[-1].strip()
if 'tvg-id="' in l:
channel_id = l.split('tvg-id="')[1].split('"')[0]
self.log_debug(f"Found channel ID: {channel_id} for {name}")
if 'tvg-logo="' in l:
logo = l.split('tvg-logo="')[1].split('"')[0]
if name:
playlist.logos[name] = logo
self.log_debug(f"Logo for {name}: {logo}")
elif l and not l.startswith("#") and name:
it = QListWidgetItem(f"{name} (Loading...)")
it.setData(Qt.UserRole, l) # stream URL
if channel_id:
it.setData(Qt.UserRole + 1, channel_id)
playlist.channels.append((name, channel_id))
it.setData(Qt.UserRole + 2, len(playlist.channels) - 1)
self.channel_list.addItem(it)
if first is None:
first = it
name = None
channel_id = None
self.playlists.append(playlist)
self.current_playlist = playlist
self.insert_dummy_epg_gaps() # Fill EPG gaps
if self.epg_grid_window and self.current_playlist:
self.epg_grid_window.update_epg(
self.current_playlist.channels,
self.current_playlist.video_info
)
if first:
self.channel_list.setCurrentItem(first)
self.log_debug(f"Found {self.channel_list.count()} channels, selecting first one")
QTimer.singleShot(500, lambda: self.on_channel_clicked(first))
# Extract EPG from playlist header
with open(path, encoding="utf-8") as f:
lines = f.read().splitlines()
if lines and lines[0].startswith("#EXTM3U") and "url-tvg=" in lines[0]:
epg_raw = lines[0].split('url-tvg="')[1].split('"')[0]
epg_urls = [url.strip() for url in epg_raw.split(',') if url.strip()]
new_urls = [url for url in epg_urls if url not in self.default_epg_urls]
self.default_epg_urls.extend(new_urls)
# Schedule EPG update
if epg_urls:
self.log_debug(f"Found EPG URLs: {epg_urls}")
QTimer.singleShot(2000, self.update_epg_data)
except Exception as e:
self.log_debug(f"Failed to load playlist: {e}")
def load_playlist_from_url(self, url):
try:
self.log_debug(f"โถ๏ธ [Custom Playlist] Downloading from URL: {url}")
# Step 1: Download playlist content
response = requests.get(url, timeout=10)
content = response.text
# Step 2: Save to file
os.makedirs(self.php_dir, exist_ok=True)
playlist_path = os.path.join(self.php_dir, "user_playlist.m3u")
with open(playlist_path, "w", encoding="utf-8") as f:
f.write(content)
# Step 3: Replace default path and call the **same method** used by default
self.playlist_path = playlist_path
self.download_playlist() # โ
exact same flow as default
except Exception as e:
self.log_debug(f"โ Failed to load playlist from URL: {e}")
QMessageBox.critical(self, "Error", f"Failed to load playlist:\n{e}")
def log_debug(self, message):
"""Add debug message to log and print to console"""
timestamp = datetime.now().strftime("%H:%M:%S")
full_message = f"[{timestamp}] {message}"
self.debug_log.append(full_message)
print(full_message)
if hasattr(self, 'debug_display'):
self.debug_display.addItem(full_message)
self.debug_display.scrollToBottom()
def logout_account(self):
AuthManager.clear_credentials()
QMessageBox.information(self, "Logged Out", "You have been logged out.")
dlg = LoginDialog(self)
if dlg.exec_() != QDialog.Accepted:
sys.exit(0)
def on_channel_clicked(self, item: QListWidgetItem):
url = item.data(Qt.UserRole)
if not url:
return
base = item.text().split('(')[0].strip()
# Load logo if available
logo_pixmap = None
logo_url = self.current_playlist.logos.get(base) if self.current_playlist else None
if logo_url:
logo_pixmap = load_logo(logo_url)
self.current_logo_pixmap = logo_pixmap # Store for use in update_epg_info
item.setText(f"{base} (Loading...)")
# NEW โ store 1-based channel number for the EPG header
self.current_channel_number = (item.data(Qt.UserRole + 2) or 0) + 1
self.current_channel_name = base
self.current_channel_resolution = "Loading..."
channel_id = item.data(Qt.UserRole + 1)
if channel_id:
self.current_channel_id = channel_id
self.update_current_epg()
# โ
Now that EPG is ready, display it
self.update_epg_info(base, "Loading...")
self.log_debug(f"Playing stream: {url}")
self.player.stop()
self.player.play_stream(url)
def retry_if_failed():
if self.player.media_player.is_playing():
return # โ
First try succeeded
self.log_debug("[RETRY] First attempt failed, retrying stream...")
self.player.stop()
self.player.release()
self.player = VLCPlayer(self.video_frame)
if url.startswith("http://localhost:8888"):
self.log_debug("[RETRY] Waiting for PHP server to be ready...")
# โ
Try to wait up to 3 seconds max
if not wait_for_php_server(timeout=3):
self.log_debug("[RETRY] PHP not ready. Restarting and waiting again...")
restart_php_server(self.php_dir)
time.sleep(1.0)
if not wait_for_php_server(timeout=3):
self.log_debug("[RETRY] PHP still not ready. Giving up.")
return
self.player.play_stream(url)
QTimer.singleShot(10000, retry_if_failed)
channel_id = item.data(Qt.UserRole + 1)
if channel_id:
self.current_channel_id = channel_id
self.update_current_epg()
QTimer.singleShot(5000, lambda: self.detect_resolution(item, base))
def open_local_playlist(self):
path, _ = QFileDialog.getOpenFileName(self, "Open M3U Playlist", "", "M3U Files (*.m3u *.m3u8)")
if path:
self.load_playlist(path)
def open_network_stream(self):
url, ok = QInputDialog.getText(self, "Open Stream", "Enter stream URL:")
if ok and url:
self.play_stream_url(url)
def play_stream_url(self, url):
# You likely already have a player method โ use your logic here
self.player.setMedia(QMediaContent(QUrl(url)))
self.player.play()
def play_next_channel(self):
idx = self.channel_list.currentRow()
if idx < self.channel_list.count() - 1:
self.channel_list.setCurrentRow(idx+1)
self.on_channel_clicked(self.channel_list.currentItem())
def play_previous_channel(self):
idx = self.channel_list.currentRow()
if idx > 0:
self.channel_list.setCurrentRow(idx-1)
self.on_channel_clicked(self.channel_list.currentItem())
def process_epg_data(self, epg_bytes):
try:
# Try to decompress the data
try:
epg_xml = gzip.decompress(epg_bytes).decode('utf-8')
self.log_debug("EPG data decompressed successfully")
except:
epg_xml = epg_bytes.decode('utf-8')
self.log_debug("EPG data was not compressed")
# Save the raw XML for debugging
debug_epg_path = os.path.join(self.php_dir, "last_epg.xml")
with open(debug_epg_path, "w", encoding="utf-8") as f:
f.write(epg_xml)
self.log_debug(f"Saved EPG data to {debug_epg_path}")
# Parse the XML
EPGStore.set(ET.fromstring(epg_xml))
self.insert_dummy_epg_gaps()
self.epg_last_update = datetime.now()
if self.epg_grid_window and self.current_playlist:
# Defer overlay creation safely inside update_epg()
self.epg_grid_window.update_epg(
self.current_playlist.channels,
self.current_playlist.video_info
)
else:
# Create fresh EPGGridWindow AFTER full EPG is available
self.epg_grid_window = EPGGridWindow(
EPGStore.get(),
self.channel_order,
self.channel_video_info,
self,
channel_logos=self.current_playlist.logos if self.current_playlist else {}
)
self.epg_grid_window.show()
self.log_debug(f"EPG data parsed successfully, last update: {self.epg_last_update}")
# Update current channel's EPG if available
if hasattr(self, 'current_channel_id') and self.current_channel_id:
print("[EPG] Initializing current channel EPG immediately after download")
self.update_current_epg()
self.update_epg_info(self.current_channel_name, self.current_channel_resolution)
# Setup auto-refresh aligned to clock
self.setup_epg_auto_refresh()
except Exception as e:
self.log_debug(f"Error processing EPG data: {str(e)}")
def refresh_epg_time(self):
if not hasattr(self, 'current_channel_name') or not hasattr(self, 'current_channel_resolution'):
return
text = self.epg_display.text()
parts = text.split("\n", 1)
if len(parts) == 2:
first_line, epg_line = parts
if "|" in first_line:
name_resolution = first_line.split("|")
if len(name_resolution) >= 2:
name = name_resolution[0].replace("", "").replace("", "").strip()
resolution = name_resolution[1].strip()
self.update_epg_info(name, resolution, self.current_channel_epg)
def refresh_channel_logos_in_list(self):
for i in range(self.channel_list.count()):
item = self.channel_list.item(i)
base = item.text().split('(')[0].strip()
logo_url = self.current_playlist.logos.get(base) if self.current_playlist else None
if logo_url:
pixmap = load_logo(logo_url)
if pixmap:
# Optionally update GUI with logo here if you're showing them in list
pass
def retry_detect_resolution(self, item, base, retries=6):
def try_fps_only():
info = self.player.get_stream_info()
if info and info != "NA":
self.current_channel_resolution = info
self.update_epg_info(self.current_channel_name or base, info)
return
else:
self.current_channel_resolution = "Stream not available"
self.update_epg_info(self.current_channel_name or base, "Stream not available")
if retries > 0:
QTimer.singleShot(10000, lambda: self.retry_detect_resolution(item, base, retries - 1))
try_fps_only()
def set_volume(self, value):
self.player.media_player.audio_set_volume(value)
if value == 0:
self.player.media_player.audio_set_mute(True)
self.mute_button.setChecked(True)
self.mute_button.setText("๐")
else:
self.player.media_player.audio_set_mute(False)
self.mute_button.setChecked(False)
self.mute_button.setText("๐")
def setup_epg_auto_refresh(self):
now = datetime.now()
# Align to next :00 or :30
if now.minute < 30:
next_run = now.replace(minute=30, second=0, microsecond=0)
else:
next_run = now.replace(hour=now.hour + 1 if now.hour < 23 else 0, minute=0, second=0, microsecond=0)
delay_ms = int((next_run - now).total_seconds() * 1000)
QTimer.singleShot(delay_ms, self.start_half_hour_timer)
def show_status_message(self, msg):
self.epg_display.setText(f"{msg}
")
def start_half_hour_timer(self):
self.update_current_epg() # Update EPG data
self.update_epg_info(self.current_channel_name, self.current_channel_resolution)
# Start timer for next 30-minute interval
self._epg_timer = QTimer()
self._epg_timer.timeout.connect(self.trigger_epg_update)
self._epg_timer.start(30 * 60 * 1000) # every 30 mins
def start_logo_prefetch(self):
# Rebuild logo mapping to ensure it's fresh
for i in range(self.channel_list.count()):
item = self.channel_list.item(i)
name = item.text().split("(")[0].strip()
if not self.current_playlist or name not in self.current_playlist.logos:
continue
url = self.current_playlist.logos[name]
def prefetch_logos():
os.makedirs(CACHE_DIR, exist_ok=True)
for name, url in list(self.current_playlist.logos.items()) if self.current_playlist else []:
if not url:
continue
try:
logo_filename = hashlib.md5(url.encode('utf-8')).hexdigest() + ".png"
logo_path = os.path.join(CACHE_DIR, logo_filename)
if not os.path.exists(logo_path):
response = requests.get(url, timeout=5)
if response.status_code == 200:
with open(logo_path, "wb") as f:
f.write(response.content)
print(f"[LOGO] โ
Downloaded: {name}")
QTimer.singleShot(0, lambda n=name: self.update_logo_display(n))
except Exception as e:
print(f"[LOGO] Failed to fetch logo for {name}: {e}")
# After all done, refresh visuals
if self.epg_grid_window:
QTimer.singleShot(0, self.epg_grid_window.refresh_channel_logos)
QTimer.singleShot(0, self.refresh_channel_logos_in_list)
threading.Thread(target=prefetch_logos, daemon=True).start()
self.log_debug("๐ผ๏ธ Background logo fetcher started...")
def toggle_epg_grid(self):
if self.epg_grid_window is None:
# Use dummy XML if EPG not ready yet
dummy_epg = self.epg_data if self.epg_data is not None else ET.Element("tv")
self.epg_grid_window = EPGGridWindow(dummy_epg, self.channel_order, self.channel_video_info, self)
if self.epg_grid_window.isVisible():
self.epg_grid_window.hide()
else:
self.epg_grid_window.show()
def toggle_fullscreen(self, event=None):
if not self._is_fullscreen:
self._normal_geometry = self.geometry()
self._normal_flags = self.windowFlags()
self.channel_list.setVisible(False)
self.epg_display.hide()
self.controls.hide()
self.setWindowFlags(self._normal_flags | Qt.FramelessWindowHint)
self.showFullScreen()
else:
self.setWindowFlags(self._normal_flags)
self.showNormal()
if self._normal_geometry:
self.setGeometry(self._normal_geometry)
self.channel_list.setVisible(False)
self.epg_display.show()
self.controls.show()
self._is_fullscreen = not self._is_fullscreen
def toggle_mute(self):
is_muted = self.player.media_player.audio_get_mute()
self.player.media_player.audio_set_mute(not is_muted)
self.mute_button.setText("๐" if not is_muted else "๐")
self.mute_button.setChecked(not is_muted)
def toggle_play_pause(self):
mp = self.player.media_player
if mp.is_playing():
mp.pause()
else:
mp.play()
def trigger_epg_update(self):
print("[EPG] Auto-updating at 30-minute boundary")
self.update_current_epg()
self.update_epg_info(self.current_channel_name, self.current_channel_resolution)
self.scroll_to_current_time() # If you want to auto-scroll like the grid does
def update_current_epg(self, epg_data=None):
if epg_data is not None:
self.epg_data = epg_data
if self.epg_data is None or not hasattr(self, 'current_channel_id'):
return
try:
# Get current LOCAL time in Asia/Karachi
local_tz = pytz.timezone("Asia/Karachi")
now_local = datetime.now(local_tz)
current_prog = None
next_prog = None
for programme in EPGStore.get().findall(".//programme"):
if programme.get("channel") != self.current_channel_id:
continue
try:
# Parse UTC times from EPG
start_utc = datetime.strptime(programme.get("start")[:14], "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc)
stop_utc = datetime.strptime(programme.get("stop")[:14], "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc)
# Convert to local time
start_local = start_utc.astimezone(local_tz)
stop_local = stop_utc.astimezone(local_tz)
if start_local <= now_local < stop_local:
current_prog = programme
elif start_local > now_local:
next_prog = programme
break # EPG is sorted by time, so first match is sufficient
except Exception as e:
print(f"[EPG Parse Error] Failed to parse start/stop: {e}")
continue
# Set current program
if current_prog is not None:
self.current_channel_epg = {
'title': unquote(current_prog.findtext('title', 'No Programme Information')),
'start': current_prog.get('start'),
'stop': current_prog.get('stop'),
'desc': unquote(current_prog.findtext('desc', ''))
}
else:
self.current_channel_epg = None
# Set next program
if next_prog is not None:
self.next_program_epg = {
'title': unquote(next_prog.findtext('title', 'No information')),
'start': next_prog.get('start'),
'stop': next_prog.get('stop'),
'desc': unquote(next_prog.findtext('desc', ''))
}
else:
self.next_program_epg = None
# โ
If current program expired, force immediate display update
if self.current_channel_epg:
stop_time_str = self.current_channel_epg.get('stop')
try:
stop_time_utc = datetime.strptime(stop_time_str[:14], "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc)
stop_time_local = stop_time_utc.astimezone(local_tz)
if now_local >= stop_time_local:
if hasattr(self, 'current_channel_name') and hasattr(self, 'current_channel_resolution'):
self.update_epg_info(self.current_channel_name, self.current_channel_resolution)
except Exception as e:
print(f"[EPG Stop Check] Failed to parse stop time: {e}")
except Exception as e:
print(f"Error updating current EPG: {str(e)}")
def update_epg_data(self):
if not self.default_epg_urls:
self.log_debug("No default EPG URLs available")
return
# Clear previous EPG before bulk reload
EPGStore.clear()
for url in self.default_epg_urls:
self.log_debug(f"Downloading EPG from: {url}")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
self.process_epg_data(response.content)
except Exception as e:
self.log_debug(f"[EPG] Failed to download from {url}: {e}")
# Load cached custom EPGs
epg_cache_file = os.path.join(self.php_dir, "custom_epg_sources.txt")
if os.path.exists(epg_cache_file):
try:
with open(epg_cache_file, "r", encoding="utf-8") as f:
for line in f:
src = line.strip()
if src and src not in self.custom_epg_sources:
self.custom_epg_sources.append(src)
print(f"[CACHE] Loaded custom EPG source: {src}")
except Exception as e:
print(f"[CACHE] Failed to load EPG cache: {e}")
self.setup_epg_auto_refresh()
def update_epg_info(self, name=None, resolution=None):
if not hasattr(self, "current_channel_name"):
return
logo_html = ""
if getattr(self, "current_logo_pixmap", None):
from PyQt5.QtCore import QBuffer
buffer = QBuffer()
buffer.open(QBuffer.ReadWrite)
self.current_logo_pixmap.save(buffer, "PNG")
base64_data = bytes(buffer.data().toBase64()).decode()
logo_html = f"
"
ch_num = getattr(self, "current_channel_number", "")
num_prefix = f"{ch_num}. " if ch_num != "" else ""
# โโโโโ Top table (name, resolution, title, time โ with logo)
epg_text = f"""
{num_prefix}{name}
{resolution}
"""
# โโโโโ Current program (title + time + progress)
if self.current_channel_epg:
title = self.current_channel_epg.get("title", "No Programme Information")
epg_text += f"""
{title}
"""
if "start" in self.current_channel_epg and "stop" in self.current_channel_epg:
try:
local_tz = pytz.timezone("Asia/Karachi")
start_local = datetime.strptime(
self.current_channel_epg["start"][:14], "%Y%m%d%H%M%S"
).replace(tzinfo=timezone.utc).astimezone(local_tz)
stop_local = datetime.strptime(
self.current_channel_epg["stop"][:14], "%Y%m%d%H%M%S"
).replace(tzinfo=timezone.utc).astimezone(local_tz)
now_local = datetime.now(local_tz)
start_str = start_local.strftime("%I:%M%p").lstrip("0")
stop_str = stop_local.strftime("%I:%M%p").lstrip("0")
duration = int((stop_local - start_local).total_seconds() // 60)
elapsed = (now_local - start_local).total_seconds()
total = (stop_local - start_local).total_seconds()
percent = max(0, min(100, int((elapsed / total) * 100))) if total > 0 else 0
progress_bar = f"""
"""
epg_text += f"""
{start_str} โ {stop_str} ({duration} mins)
{progress_bar}
"""
except Exception:
pass
else:
epg_text += """
No Programme Information
"""
epg_text += f"""
|
{logo_html} |
"""
# โโโโโ Current program description (placed BELOW table!)
if self.current_channel_epg and self.current_channel_epg.get("desc"):
epg_text += f"""
{self.current_channel_epg['desc']}
"""
# โโโโโ Next program (after current description)
if self.next_program_epg:
epg_text += f"""
Next: {self.next_program_epg.get('title', 'No information')}
"""
if self.next_program_epg.get("desc"):
epg_text += f"""
{self.next_program_epg['desc']}
"""
if "start" in self.next_program_epg and "stop" in self.next_program_epg:
try:
local_tz = pytz.timezone("Asia/Karachi")
start_local = datetime.strptime(
self.next_program_epg["start"][:14], "%Y%m%d%H%M%S"
).replace(tzinfo=timezone.utc).astimezone(local_tz)
stop_local = datetime.strptime(
self.next_program_epg["stop"][:14], "%Y%m%d%H%M%S"
).replace(tzinfo=timezone.utc).astimezone(local_tz)
start_str = start_local.strftime("%I:%M%p").lstrip("0")
stop_str = stop_local.strftime("%I:%M%p").lstrip("0")
epg_text += f"""
{start_str} โ {stop_str}
"""
except Exception:
pass
self.epg_display.setText(epg_text)
def update_logo_display(self, name):
logo_url = self.current_playlist.logos.get(name)
if not logo_url:
return
pixmap = load_logo(logo_url)
if not pixmap:
return
# โ
Update in EPG grid window
if self.epg_grid_window and name in self.epg_grid_window.channel_logo_items:
logo_item = self.epg_grid_window.channel_logo_items[name]
scaled_pixmap = pixmap.scaled(40, 30, Qt.KeepAspectRatio, Qt.SmoothTransformation)
logo_item.setPixmap(scaled_pixmap)
print(f"[LOGO] โ Displayed in EPG grid: {name}")
# โ
Update in Main window if this channel is playing
if hasattr(self, 'current_channel_name') and self.current_channel_name == name:
self.current_logo_pixmap = pixmap
self.update_epg_info(name, self.current_channel_resolution)
print(f"[LOGO] ๐ฌ Displayed in main window: {name}")
QApplication.processEvents()
class LogoManager(QObject):
logo_downloaded = pyqtSignal(str) # emits channel_name
def __init__(self, playlist_model):
super().__init__()
self.playlist = playlist_model
def start_download(self):
import threading
def worker():
os.makedirs(CACHE_DIR, exist_ok=True)
for name, url in list(self.playlist.logos.items()):
try:
logo_filename = hashlib.md5(url.encode('utf-8')).hexdigest() + ".png"
logo_path = os.path.join(CACHE_DIR, logo_filename)
if not os.path.exists(logo_path):
response = requests.get(url, timeout=5)
if response.status_code == 200:
with open(logo_path, "wb") as f:
f.write(response.content)
print(f"[LOGO] โ
Downloaded: {name}")
self.logo_downloaded.emit(name)
except Exception as e:
print(f"[LOGO] โ Failed to fetch logo for {name}: {e}")
threading.Thread(target=worker, daemon=True).start()
class NowLineOverlay(QWidget):
def __init__(self, parent, grid_window):
super().__init__(parent)
self.grid_window = grid_window
self.slot_width = grid_window.timeline_slot_width # โ
store slot width
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setStyleSheet("background: transparent;")
self.setVisible(True) # โ
Ensure it's visible
def paintEvent(self, event):
from PyQt5.QtGui import QPainter, QPen
painter = QPainter(self)
pen = QPen(Qt.red, 2)
painter.setPen(pen)
now = datetime.now(pytz.timezone("Asia/Karachi"))
delta = now - self.grid_window.timeline_start
adjusted_seconds = delta.total_seconds() + RED_LINE_SHIFT_SECONDS
block_width = self.grid_window.timeline_slot_width # should be 200 if set correctly
# Total X position of the red line
x = (adjusted_seconds / 1800) * block_width
painter.drawLine(int(x), 0, int(x), self.height())
class PlaylistModel:
def __init__(self, name, source_url, channels=None, epg_data=None, logos=None):
self.name = name
self.source_url = source_url
self.channels = channels or [] # List of tuples: (name, id)
self.epg_data = epg_data or {} # Dict: channel_name โ list of programs
self.logos = logos or {} # Dict: channel_name โ logo URL
self.video_info = {} # Dict: channel_name โ (resolution, fps)
class TimelineLabel(QLabel):
def __init__(self, text, start_offset, duration, parent_grid, timeline_slot_width):
super().__init__()
self.parent_grid = parent_grid
self.full_text = text
self.start_offset = start_offset
self.duration = duration
self.parent_grid = parent_grid
self.timeline_slot_width = timeline_slot_width
self.setStyleSheet("""
border: 1px solid #555;
color: white;
""")
self.setWordWrap(True) # Allow multi-line
self.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
self.setText(self.full_text) # Let QLabel handle the text
self.setStyleSheet("""
border: 1px solid #555;
color: white;
font-size: 10pt;
padding: 2px; /* reduced padding */
background-color: #333;
""")
self.setStyleSheet("border: 1px solid #444; padding: 2px; background-color: #333;")
def mouseDoubleClickEvent(self, event):
print("[DEBUG] Program grid double-click detected")
if self.parent_grid and self.parent_grid.main_window:
try:
channel_number = self.data(1) # Get the actual channel number stored in setData(1, ...)
print(f"[DEBUG] Double-clicked EPG block for channel number: {channel_number}")
if channel_number is not None:
for i in range(self.parent_grid.main_window.channel_list.count()):
item = self.parent_grid.main_window.channel_list.item(i)
if item.data(Qt.UserRole + 2) == channel_number:
print(f"[DEBUG] Attempting to play channel: {item.text()}")
self.parent_grid.main_window.channel_list.setCurrentItem(item)
self.parent_grid.main_window.on_channel_clicked(item)
break
except Exception as e:
print(f"[ERROR] Failed to handle double-click: {e}")
event.accept()
class VideoFrame(QFrame):
def __init__(self, parent=None):
super().__init__(parent)
self.setStyleSheet("background-color: black;")
class VLCPlayer:
def __init__(self, video_frame):
self.instance = vlc.Instance()
self.media_player = self.instance.media_player_new()
if sys.platform == "win32":
self.media_player.set_hwnd(int(video_frame.winId()))
else:
self.media_player.set_xwindow(int(video_frame.winId()))
self.playback_timeout_timer = QTimer()
self.playback_timeout_timer.setSingleShot(True)
self.playback_timeout_timer.timeout.connect(self.check_playback_timeout)
def check_playback_timeout(self):
if not self.media_player.is_playing():
print("[VLC] Stream timeout โ stopping playback")
self.media_player.stop()
def play_stream(self, url):
try:
self.media_player.stop() # Make sure it's fully stopped before new media
except Exception:
pass
media = self.instance.media_new(url)
# โ
Optimized for faster startup
media.add_option(":network-caching=1000")
media.add_option(":file-caching=500")
media.add_option(":tcp-caching=500")
media.add_option(":avcodec-hw=any") # โ
Try hardware decoding when possible
media.add_option("--no-drop-late-frames")
media.add_option("--no-skip-frames")
media.add_option(":http-reconnect")
#media.add_option(":http-continuous")
media.add_option(":ffmpeg-skiploopfilter=all")
media.add_option(":ffmpeg-skipframe=default")
self.media_player.set_media(media)
self.media_player.audio_set_volume(70)
self.media_player.play()
# Start timeout safety
self.playback_timeout_timer.start(10000)
def release(self):
try:
self.media_player.release()
except Exception: pass
try:
self.instance.release()
except Exception: pass
def stop(self):
self.media_player.stop()
def get_logo_path(url):
"""Returns local cache path for a logo URL"""
hashname = hashlib.md5(url.encode('utf-8')).hexdigest()
return os.path.join(CACHE_DIR, f"{hashname}.png")
def load_logo(url):
"""Returns QPixmap (cached or downloaded later)"""
path = get_logo_path(url)
if os.path.exists(path):
return QPixmap(path)
# Instead of blocking download, skip if not in cache
return None # Later, we can trigger background fetch
def monitor_php_server(php_dir):
global php_proc
if php_proc and php_proc.poll() is not None:
print("[WARNING] PHP server exited. Restarting...")
start_php_server(php_dir)
def restart_php_server(php_dir):
global php_proc
try:
if php_proc:
print("[PHP] Restarting embedded PHP server...")
php_proc.terminate()
php_proc.wait(timeout=2)
except Exception as e:
print(f"[PHP] Error terminating PHP server: {e}")
# Start again
php_proc = start_php_server(php_dir)
def start_php_server(php_dir, port=8888):
global php_proc
try:
os.chdir(php_dir)
php_proc = subprocess.Popen(
["php", "-S", f"localhost:{port}"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
)
return php_proc
except Exception as e:
print(f"Failed to start PHP server: {e}")
return None
def wait_for_php_server(host='localhost', port=8888, timeout=3):
"""Wait until PHP server is accepting connections."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
with socket.create_connection((host, port), timeout=0.5):
return True
except (ConnectionRefusedError, socket.timeout, OSError):
time.sleep(0.1)
return False
if __name__ == "__main__":
php_dir = os.path.join(os.path.dirname(__file__), "php_files")
os.makedirs(php_dir, exist_ok=True)
php_proc = start_php_server(php_dir)
app = QApplication(sys.argv)
window = IPTVWindow(php_dir)
# โ
Add this backup cleanup line
app.aboutToQuit.connect(window.close)
window.show()
# Now create and show EPG window AFTER main window is visible
if window.epg_grid_window is None:
window.epg_grid_window = EPGGridWindow(
window.epg_data,
window.channel_order,
window.channel_video_info,
window,
channel_logos=window.channel_logos
)
window.epg_grid_window.show()
# โ
Move PHP monitor timer here BEFORE app.exec_()
php_timer = QTimer()
php_timer.timeout.connect(lambda: monitor_php_server(php_dir))
php_timer.start(15000)
# โ
Debug active threads after 1 second
QTimer.singleShot(1000, lambda: print(f"[DEBUG] Threads at 1s: {[t.name for t in threading.enumerate()]}"))
if platform.system() == "Windows":
try:
import ctypes
# Set Per Monitor v2 DPI Awareness
awareness_context = 3 # DPI_AWARENESS_CONTEXT_PER_MONITOR_AWARE_V2
ctypes.windll.user32.SetProcessDpiAwarenessContext(ctypes.c_void_p(awareness_context))
print("[DPI] Enabled Per-Monitor V2 DPI Awareness")
except Exception as e:
print("[DPI] DPI awareness setting failed:", e)
exit_code = app.exec_()
# Stop PHP if still running
if php_proc:
php_proc.terminate()
php_proc.wait(2)
sys.exit(exit_code)