import os import cloudscraper from bs4 import BeautifulSoup from datetime import datetime, timedelta import pytz import time from threading import Thread from collections import defaultdict import sys import re import logging import shutil global mt4_files_path global mt5_files_path global url # āœ… Copy the alert file to MT4's MQL4/Files folder # Office Laptop IC Market MT4 mt4_files_path = r"C:/Users/engr_/AppData/Roaming/MetaQuotes/Terminal/5D49F47D1EA1ECFC0DDC965B6D100AC5/MQL4/Files" # <- CHANGE THIS mt5_files_path = r"C:/Users/engr_/AppData/Roaming/MetaQuotes/Terminal/010E047102812FC0C18890992854220E/MQL5/Files" # <- CHANGE THIS # UK IC Market MT4 # mt4_files_path = r"C:/Users/M2-UK-Office/AppData/Roaming/MetaQuotes/Terminal/5D49F47D1EA1ECFC0DDC965B6D100AC5/MQL4/Files" # USA IC Market MT4 # mt4_files_path = r"C:/Users/Trading/AppData/Roaming/MetaQuotes/Terminal/5D49F47D1EA1ECFC0DDC965B6D100AC5/MQL4/Files" # mt5_files_path = r"C:/Users/Trading/AppData/Roaming/MetaQuotes/Terminal/010E047102812FC0C18890992854220E/MQL5/Files" url = "https://www.forexfactory.com/" url = "http://localhost/Forex/forexfactory.htm" class ForexNewsScraper: def __init__(self): self.scraper = cloudscraper.create_scraper() self.timezone = pytz.timezone("Asia/Karachi") # Pakistan Time self.running = True self.next_event = None self.last_scrape_time = None self.countdown_thread = None self.last_event_triggered = None self.next_periodic_scrape = None self.next_safety_check = None self.processed_events = set() # Track processed (currency, time) tuples self.news_window_active = False self.last_news_scrape_time = None self.news_window_scrape_count = 0 # Track number of scrapes during news window print("āœ… Initialized Forex News Scraper") def get_forex_news(self): """Fetch HTML from Forex Factory with forced timezone""" try: print("🌐 Fetching Forex Factory page...", end="\r") # Force Pakistan timezone (UTC+5) cookies = { 'timezoneoffset': '-300', # Pakistan Standard Time (UTC+5) 'timezonename': 'Asia/Karachi', 'fftimezone': 'Asia/Karachi' } headers = { 'Accept-Language': 'en-US,en;q=0.9', 'X-Requested-With': 'XMLHttpRequest' } response = self.scraper.get(url, cookies=cookies, headers=headers, timeout=10) if response.status_code == 200: print("āœ… Successfully fetched page with Pakistan timezone") return response.text print(f"āŒ Error: Status code {response.status_code}") except Exception as e: print(f"āŒ Fetch error: {str(e)}") return None def safe_get_text(self, element): """Safe extraction of text from HTML element""" if isinstance(element, str): return element.strip() return element.text.strip() if element else "N/A" def get_impact_level(self, impact_tag): """Determine impact level from HTML classes""" if not impact_tag: return "N/A" impact_span = impact_tag.find("span") if impact_span and impact_span.has_attr("class"): impact_classes = impact_span["class"] if "icon--ff-impact-red" in impact_classes: return "High" elif "icon--ff-impact-ora" in impact_classes: return "Medium" elif "icon--ff-impact-yel" in impact_classes: return "Low" return "N/A" def calculate_percentage_diff(self, actual, forecast, status): """Calculate percentage difference between actual and forecast, handling K/M suffixes""" try: # Remove any K/M suffixes and convert to float def clean_value(val): if isinstance(val, str): val = val.replace(',', '').replace('K', '000').replace('M', '000000') val = val.replace('%', '') # Remove percentage signs if present return float(val) if val else 0.0 actual_num = clean_value(actual) forecast_num = clean_value(forecast) if forecast_num == 0: return 0.0 raw_diff = ((actual_num - forecast_num) / forecast_num) * 100 if "Better" in status: return abs(raw_diff) elif "Worse" in status: return -abs(raw_diff) return raw_diff except (ValueError, TypeError): return None def get_actual_result_status(self, actual_tag, forecast, event_time_str): now = datetime.now(self.timezone) try: event_time = datetime.strptime(event_time_str.strip().lower(), "%I:%M%p") event_time = now.replace(hour=event_time.hour, minute=event_time.minute, second=0, microsecond=0) except Exception: event_time = now if not actual_tag: if now < event_time + timedelta(minutes=2): return "Pending", None else: return "N/A", None actual_text = self.safe_get_text(actual_tag) if actual_text in ["", "-", "—"]: if now < event_time + timedelta(minutes=2): return "Pending", None else: return "N/A", None status = "Neutral" row = actual_tag.find_parent("tr") if row: classes = row.get("class", []) if "better" in classes: status = "Better" elif "worse" in classes: status = "Worse" elif "none" in classes: status = "Neutral" span = actual_tag.find("span") if span and span.has_attr("class"): classes = span["class"] if "better" in classes: status = "Better" elif "worse" in classes: status = "Worse" elif "none" in classes: status = "Neutral" percentage_diff = self.calculate_percentage_diff(actual_text, forecast, status) return status, percentage_diff def parse_news(self, html): print("šŸ” Parsing news data...") soup = BeautifulSoup(html, "html.parser") news_list = [] today_date = datetime.now(self.timezone).strftime("%b %d") rows = soup.find_all("tr", class_="calendar__row") if not rows: print("āš ļø Warning: No news rows found") return [] current_date = None current_time = None for row in rows: try: date_tag = row.find("td", class_="calendar__date") time_tag = row.find("td", class_="calendar__time") currency_tag = row.find("td", class_="calendar__currency") impact_tag = row.find("td", class_="calendar__impact") event_tag = row.find("td", class_="calendar__event") actual_tag = row.find("td", class_="calendar__actual") forecast_tag = row.find("td", class_="calendar__forecast") previous_tag = row.find("td", class_="calendar__previous") if date_tag and date_tag.text.strip(): current_date = date_tag.text.strip() if time_tag and time_tag.text.strip(): current_time = time_tag.text.strip() if not event_tag or not event_tag.text.strip(): continue impact_level = self.get_impact_level(impact_tag) forecast = self.safe_get_text(forecast_tag) if forecast in ["", "-", "—"]: forecast = "N/A" previous = self.safe_get_text(previous_tag) if previous in ["", "-", "—"]: previous = "N/A" actual_text = self.safe_get_text(actual_tag) if actual_text in ["", "-", "—"]: actual_text = "" status, percentage_diff = self.get_actual_result_status(actual_tag, forecast, current_time) news_item = { "date": current_date or "Unknown Date", "time": current_time or "N/A", "currency": self.safe_get_text(currency_tag), "impact": impact_level, "event": self.safe_get_text(event_tag), "actual": actual_text if actual_text else status, # Show 'Pending' or 'N/A' if needed "actual_status": status, "percentage_diff": percentage_diff, "forecast": forecast, "previous": previous } news_list.append(news_item) except Exception as e: print(f"āŒ Error parsing row: {str(e)}") continue print(f"šŸ“Š Found {len(news_list)} news items") return news_list def generate_report_content(self, grouped_events): """Generate the formatted content for reports (shared by all report types)""" from collections import defaultdict gen_time = datetime.now(self.timezone) content = [] # Report header content.append("Forex Factory News Report\n\n") content.append(f"Generated at: {gen_time.strftime('%Y-%m-%d %I:%M:%S %p')} (Pakistan Time)\n\n") # Group events by time so that currencies at the same time are together. time_based_sort = defaultdict(list) for (currency, time_slot), items in grouped_events.items(): time_based_sort[time_slot].append((currency, items)) # Define a custom sort key that handles both time values and special labels. def time_sort_key(ts): ts_lower = ts.lower().strip() if ts_lower == "all day": return datetime.max try: return datetime.strptime(ts_lower, "%I:%M%p") except Exception: return datetime.min sorted_time_slots = sorted(time_based_sort.keys(), key=time_sort_key) for time_slot in sorted_time_slots: entries = time_based_sort[time_slot] for news_number, (currency, items) in enumerate(entries, start=1): content.append(f"=== {currency} at {time_slot} ===\n") # Count high impact news for this group high_impact_items = [item for item in items if item['impact'] == 'High'] high_total = len(high_impact_items) high_counter = 0 for item in items: content.append(f"\nTime: {time_slot}\n") content.append(f"Currency: {currency}\n") content.append(f"Impact: {item['impact']}\n") if item['impact'] == 'High': high_counter += 1 content.append(f"High Impact News Number : {high_counter}/{high_total}\n") elif high_total == 0: content.append("High Impact News Number : N/A\n") content.append(f"Event: {item['event']}\n") content.append(f"Previous: {item['previous']}\n") content.append(f"Forecast: {item['forecast']}\n") content.append(f"Actual: {item['actual']}\n") if item['percentage_diff'] is not None: diff = item['percentage_diff'] content.append(f"Difference: {diff:+.2f}%\n") else: content.append("Difference: N/A\n") result_status = item['actual_status'] if item['actual_status'] else "N/A" content.append(f"Result: {result_status}\n") relevant_highs = [ item for item in items if item['impact'] == 'High' and item['percentage_diff'] is not None and item['actual_status'] != "Pending" ] if relevant_highs: diffs = [item['percentage_diff'] for item in relevant_highs] avg_diff = sum(diffs) / len(diffs) if avg_diff >= 1.0: status = "Better" elif avg_diff <= -1.0: status = "Worse" else: status = "Neutral" overall_line = f"OVERALL IMPACT: {status} ({avg_diff:+.2f}%)" else: overall_line = "OVERALL IMPACT: No qualified high-impact results yet" content.append(f"\n{overall_line}\n") content.append("\n" + "=" * 40 + "\n") return "".join(content) def create_hourly_report(self, grouped_events): """Create hourly report using shared generation logic""" filename = "forex_news_hourly.txt" print(f"\nšŸ“ Writing hourly report to {filename}") content = self.generate_report_content(grouped_events) with open(filename, "w", encoding="utf-8") as f: f.write(content) # āœ… Find next high-impact event time now = datetime.now(self.timezone) upcoming_times = {} for (currency, time_str), events in grouped_events.items(): try: if not any(e["impact"] == "High" for e in events): continue # Skip non-high-impact events # Parse time event_time = datetime.strptime(time_str.lower(), "%I:%M%p") event_datetime = now.replace(hour=event_time.hour, minute=event_time.minute, second=0, microsecond=0) # Adjust to future if needed if event_datetime < now: event_datetime += timedelta(days=1) # Store earliest time if event_datetime not in upcoming_times: upcoming_times[event_datetime] = [] upcoming_times[event_datetime].append((currency, time_str)) except Exception as e: print(f"āŒ Error parsing time: {time_str} | {e}") if not upcoming_times: print("ā„¹ļø No upcoming high-impact events found.") return # Get earliest time next_time = min(upcoming_times.keys()) next_time_str = upcoming_times[next_time][0][1] # Collect all currencies with high impact at that time high_impact_currencies = [] for (currency, time_str), events in grouped_events.items(): if time_str != next_time_str: continue if any(item['impact'] == 'High' for item in events): high_impact_currencies.append(currency) print(f"🧠 Next high-impact time: {next_time_str}") print(f"[DEBUG] High-impact currencies: {high_impact_currencies}") if high_impact_currencies: try: alert_path = "Next_News_Alert.txt" with open(alert_path, "w", encoding="utf-8") as f: f.write(f"Time: {next_time_str}\n") f.write("Impact: High\n") f.write(f"Currency: {','.join(high_impact_currencies)}\n") print(f"āœ… Next_News_Alert.txt written") shutil.copy(alert_path, os.path.join(mt4_files_path, alert_path)) print(f"āœ… Copied to MT4 folder: {mt4_files_path}") shutil.copy(alert_path, os.path.join(mt5_files_path, alert_path)) print(f"āœ… Copied to MT5 folder: {mt5_files_path}") except Exception as e: print(f"āŒ Failed to write or copy Next_News_Alert.txt: {e}") else: print("ā„¹ļø No high-impact currencies found for next high-impact time.") def create_news_alert(self, grouped_events, current_time_str): """ Create alert file per High Impact event at the current scrape time only. """ try: now = datetime.now(self.timezone) date_str = now.strftime("%Y%m%d") def normalize_time(t): try: return datetime.strptime(t.strip().lower(), "%I:%M%p").strftime("%H%M") except: return None current_hhmm = normalize_time(current_time_str) for (currency, time_slot), events in grouped_events.items(): if normalize_time(time_slot) != current_hhmm: continue # Only process current time slot for event in events: if event['impact'] != "High": continue # Only high-impact news time_hhmm = normalize_time(time_slot) or "0000" clean_event = re.sub(r'[^\w\-]+', '-', event['event']).strip('-') filename = f"News_Alert_{time_hhmm}_{currency}_{clean_event}.txt" result_status = event['actual_status'] if event['actual_status'] else "N/A" if event['percentage_diff'] is not None: diff_line = f"Difference: {event['percentage_diff']:+.2f}%" else: diff_line = "Difference: N/A" content_lines = [ "Forex Factory News Alert\n", f"Generated at: {now.strftime('%Y-%m-%d %I:%M:%S %p')} (Pakistan Time)\n", "", f"Time: {time_slot}", f"Currency: {currency}", f"Impact: {event['impact']}" ] # Add High Impact News Number (position in total high-impact events for this currency/time) same_slot_events = grouped_events.get((currency, time_slot), []) high_impact_events = [e for e in same_slot_events if e['impact'] == 'High'] total_high = len(high_impact_events) if total_high > 0: current_index = high_impact_events.index(event) + 1 if event in high_impact_events else 1 content_lines.append(f"High Impact News Number : {current_index}/{total_high}") else: content_lines.append("High Impact News Number : N/A") # Continue with the rest content_lines += [ f"Event: {event['event']}", f"Previous: {event['previous']}", f"Forecast: {event['forecast']}", f"Actual: {event['actual']}", diff_line, f"Result: {result_status}\n", "=" * 40 ] with open(filename, "w", encoding="utf-8") as alert_file: alert_file.write("\n".join(content_lines) + "\n") print(f"āœ… Created alert file: {filename}") try: shutil.copy(filename, os.path.join(mt4_files_path, filename)) print(f"šŸ“¤ Copied to MT4 folder: {mt4_files_path}") shutil.copy(filename, os.path.join(mt5_files_path, filename)) print(f"šŸ“¤ Copied to MT5 folder: {mt5_files_path}") except Exception as copy_err: print(f"āŒ Could not copy to MT4 folder: {copy_err}") except Exception as e: timestamp = datetime.now(self.timezone).strftime("%Y%m%d_%H%M") error_filename = f"News_Alert_ERROR_{timestamp}.txt" with open(error_filename, "w", encoding="utf-8") as f: f.write("Forex Factory News Alert Error\n\n") f.write(f"Error generating news alert: {str(e)}\n") print(f"āŒ Error creating news alert: {e}") def process_news_events(self, news_data, is_hourly=False): """Process all news events with enhanced multi-currency support""" print("\nāš™ļø Processing news events...") now = datetime.now(self.timezone) # Group by currency and time events = defaultdict(list) for item in news_data: key = (item['currency'], item['time']) events[key].append(item) # Create reports based on scrape type if is_hourly: self.create_hourly_report(events) else: # For news event scrapes, create timestamped report self.create_news_alert(events, datetime.now(self.timezone).strftime("%I:%M%p").lower()) # Update next event tracking self.update_next_event(events) def update_next_event(self, grouped_events): """Update the next upcoming event""" now = datetime.now(self.timezone) next_event = None for (currency, time_str), items in grouped_events.items(): try: if time_str.lower() == "tentative": continue event_time = datetime.strptime(time_str.lower(), "%I:%M%p") event_datetime = now.replace( hour=event_time.hour, minute=event_time.minute, second=0, microsecond=0 ) if event_datetime > now and (next_event is None or event_datetime < next_event[2]): next_event = (currency, time_str, event_datetime) except Exception: continue self.next_event = next_event def schedule_safety_check(self, currency, time_str): """Schedule a full scrape 1 minute after news event""" try: event_time = datetime.strptime(time_str.lower(), "%I:%M%p") safety_check_time = (datetime.now(self.timezone) .replace(hour=event_time.hour, minute=event_time.minute) + timedelta(minutes=1)) self.next_safety_check = (currency, time_str, safety_check_time) print(f"ā± Safety check scheduled for {safety_check_time.strftime('%I:%M%p')}") except Exception as e: print(f"āŒ Error scheduling safety check: {str(e)}") def show_countdown(self): """Final working version with guaranteed 50-scrape precision (1s intervals)""" last_display = "" last_hourly_scrape = None last_scrape_time = None while self.running: now = datetime.now(self.timezone) current_sec = now.second # ===== HOURLY TIMER ===== next_hourly = now.replace(minute=7, second=0, microsecond=0) if next_hourly < now: next_hourly += timedelta(hours=1) hourly_countdown = next_hourly - now hh, hm = divmod(hourly_countdown.seconds // 60, 60) hs = hourly_countdown.seconds % 60 hourly_str = f"{hh:01d}:{hm:02d}:{hs:02d}" # ===== NEWS EVENT HANDLING ===== news_display = "" in_news_window = hasattr(self, '_news_window_active') if in_news_window: # During news window, show countdown to next scrape (1s intervals) elapsed = (now - self._news_window_start).total_seconds() next_scrape_in = max(0, 1 - (elapsed % 1)) news_display = f" | Next scrape in: {next_scrape_in:.1f}s" # Trigger scrape every 1 second if elapsed // 1 > self._news_window_count and self._news_window_count < 50: self._news_window_count += 1 print(f"\nā° PRECISION SCRAPE {self._news_window_count}/50 at {now.strftime('%H:%M:%S')}") self.run_scraper(is_hourly=False) # After last scrape, schedule safety check if self._news_window_count == 50: self.schedule_safety_check(self.next_event[0], self.next_event[1]) elif self.next_event: # Outside news window, show countdown to next news event currency, time_str, event_time = self.next_event news_countdown = event_time - now nh, nm = divmod(news_countdown.seconds // 60, 60) ns = news_countdown.seconds % 60 details = self.get_event_details(currency, time_str) news_display = f" | Next news: {currency} at {time_str} (in {nh:01d}:{nm:02d}:{ns:02d}){details}" # When news window starts: if self.next_event and not hasattr(self, '_news_window_active') and now >= self.next_event[2]: self._news_window_active = True self._news_window_start = now self._scrape_times = [now + timedelta(seconds=x*1) for x in range(50)] # Pre-calculate all 50 times self._news_window_count = 0 print(f"\n🚨 NEWS WINDOW STARTED") # During news window: if hasattr(self, '_news_window_active') and self._news_window_count < 50: if now >= self._scrape_times[self._news_window_count]: self._news_window_count += 1 print(f"\nā° PRECISION SCRAPE {self._news_window_count}/50 at {now.strftime('%H:%M:%S.%f')[:-3]}") self.run_scraper(is_hourly=False) if self._news_window_count == 50: print("\nāœ… NEWS WINDOW COMPLETED") # Clean up news window state delattr(self, '_news_window_active') delattr(self, '_scrape_times') delattr(self, '_news_window_count') # Force refresh to get updated events self.run_scraper(is_hourly=True) # ===== HOURLY SCRAPE ===== if now.minute == 7 and (last_hourly_scrape is None or (now - last_hourly_scrape).total_seconds() >= 3600): print(f"\nā° HOURLY SCRAPE at {now.strftime('%H:%M:%S')}") self.run_scraper(is_hourly=True) last_hourly_scrape = now # ===== DISPLAY ===== current_display = f"ā³ Next hourly: {next_hourly.strftime('%I:%M%p').lower()} (in {hourly_str})" if in_news_window: current_display += news_display elif self.next_event: current_display += news_display else: current_display += " | No upcoming news" if current_display != last_display: print(current_display, end="\r") last_display = current_display time.sleep(0.1) def get_event_details(self, currency, event_time): """Get additional details about the upcoming event""" try: with open("forex_news_hourly.txt", "r") as f: content = f.read() pattern = rf"=== {currency} at {event_time} ===(.*?)(?:\n===|$)" match = re.search(pattern, content, re.DOTALL) if not match: return "" section = match.group(1) event_match = re.search(r"Event: (.*?)\n.*?Impact: (.*?)\n", section, re.DOTALL) if event_match: event_name, impact = event_match.groups() return f" | {event_name.strip()} ({impact})" return "" except Exception: return "" def run_scraper(self, is_hourly=True): """Main scraping function""" print("\nšŸ”„ Running scraper...") html = self.get_forex_news() if not html: print("āŒ Failed to fetch data") return news_data = self.parse_news(html) if not news_data: print("āŒ No news data found") return self.process_news_events(news_data, is_hourly=is_hourly) self.last_scrape_time = datetime.now(self.timezone) def start(self): """Start the scraper""" print("\nšŸš€ Starting Forex News Scraper...") # Initial fetch - force is_hourly=True for startup self.run_scraper(is_hourly=True) # Start countdown thread self.countdown_thread = Thread(target=self.show_countdown) self.countdown_thread.daemon = True self.countdown_thread.start() try: while self.running: time.sleep(1) except KeyboardInterrupt: self.running = False print("\nšŸ›‘ Script stopped by user") if __name__ == "__main__": scraper = ForexNewsScraper() scraper.start()