""" Module d'envoi de rapports par email avec timeout strict """ import smtplib import os import socket import threading from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from pathlib import Path from datetime import datetime from typing import Optional, List class EmailReportSender: """Classe pour envoyer des rapports par email avec HTML et pieces jointes""" def __init__(self, smtp_server: str = None, smtp_port: int = None, from_email: str = None, to_email: str = None, use_tls: bool = False): """Initialise le sender avec la config SMTP""" self.smtp_server = smtp_server or os.getenv('SMTP_SERVER', 'localhost') self.smtp_port = int(smtp_port or os.getenv('SMTP_PORT', 25)) self.from_email = from_email or os.getenv('FROM_EMAIL', 'monitoring@example.com') self.to_email = to_email or os.getenv('TO_EMAIL', 'admin@example.com') self.use_tls = use_tls def send_simple_report(self, subject: str, html_body: str, text_body: Optional[str] = None, attachments: Optional[List[str]] = None, cc: Optional[str] = None, bcc: Optional[str] = None) -> bool: """Envoie un email simple avec rapport HTML""" try: msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = self.from_email msg['To'] = self.to_email if cc: msg['Cc'] = cc if bcc: msg['Bcc'] = bcc if text_body: msg.attach(MIMEText(text_body, 'plain', 'utf-8')) else: text_fallback = self._html_to_text(html_body) msg.attach(MIMEText(text_fallback, 'plain', 'utf-8')) msg.attach(MIMEText(html_body, 'html', 'utf-8')) if attachments: for attachment_path in attachments: if Path(attachment_path).exists(): self._attach_file(msg, attachment_path) recipients = [self.to_email] if cc: recipients.extend([e.strip() for e in cc.split(',')]) if bcc: recipients.extend([e.strip() for e in bcc.split(',')]) msg_string = msg.as_string() return self._send_with_timeout(recipients, msg_string, timeout=5) except Exception as e: print(f"[ERROR] Error preparing email: {e}") return False def _send_with_timeout(self, recipients, msg_string, timeout=5): """Envoie email dans un thread avec timeout strict""" result = {'sent': False, 'error': None} def send_thread(): try: print(f" [INFO] Checking socket on {self.smtp_server}:{self.smtp_port}...") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) sock.connect((self.smtp_server, self.smtp_port)) sock.close() with smtplib.SMTP(self.smtp_server, self.smtp_port, timeout=timeout) as server: if self.use_tls: server.starttls() server.sendmail(self.from_email, recipients, msg_string) result['sent'] = True except socket.timeout: result['error'] = f"Timeout: {self.smtp_server}:{self.smtp_port} no response in {timeout}s" except socket.gaierror: result['error'] = f"Host not found: {self.smtp_server}" except ConnectionRefusedError: result['error'] = f"Connection refused: {self.smtp_server}:{self.smtp_port}" except socket.error as e: result['error'] = f"Socket error: {e}" except smtplib.SMTPException as e: result['error'] = f"SMTP error: {e}" except Exception as e: result['error'] = f"Error: {type(e).__name__}: {e}" thread = threading.Thread(target=send_thread, daemon=True) thread.start() thread.join(timeout=timeout + 2) if thread.is_alive(): print(f"[ERROR] Timeout after {timeout + 2}s - server not responding") return False if result['sent']: print(f"[SUCCESS] Email sent successfully") return True else: print(f"[ERROR] {result['error']}") return False @staticmethod def _attach_file(msg, file_path: str): """Attach file to message""" part = MIMEBase('application', 'octet-stream') with open(file_path, 'rb') as attachment: part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={Path(file_path).name}') msg.attach(part) @staticmethod def _html_to_text(html): """Convert HTML to text""" import re text = re.sub('<[^<]+?>', '', html) text = re.sub(r'\n\s*\n', '\n\n', text) return text.strip() def _get_expiry_status_for_html(expiration_str): """Return (label, color_hex) based on days until expiry - matching CLI behavior""" try: from datetime import timezone exp = datetime.fromisoformat(expiration_str.replace("Z", "+00:00")) # Handle naive datetime (simple date without timezone info) if exp.tzinfo is None: exp = exp.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) days = (exp - now).days if days < 0: return "[EXPIRED]", "#d32f2f" elif days < 3: return f"[CRITICAL – expires in {days}d]", "#d32f2f" elif days < 7: return f"[WARNING – expires in {days}d]", "#f57c00" else: return f"[OK – expires in {days}d]", "#388e3c" except (ValueError, AttributeError, TypeError): return "", "#000000" def generate_rundeck_tokens_report(tokens, base_url="", timestamp=None): """Generate HTML report for Rundeck tokens matching CLI display""" if not timestamp: timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S") total = len(tokens) expired = sum(1 for t in tokens if t.get('expired')) active = total - expired html = f"""
Date: {timestamp}
Server: {base_url}