124 lines
4.6 KiB
Python
124 lines
4.6 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
import re
|
|
import mysql.connector
|
|
from urllib.parse import urljoin, urlparse
|
|
from html import unescape
|
|
|
|
# MySQL-Verbindungsdetails
|
|
DB_HOST = "192.168.178.201"
|
|
DB_USER = "gelbeseiten"
|
|
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
|
|
DB_NAME = "domainchecker"
|
|
|
|
def connect_to_database():
|
|
return mysql.connector.connect(
|
|
host=DB_HOST,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD,
|
|
database=DB_NAME
|
|
)
|
|
|
|
def get_valid_websites(cursor):
|
|
# Webseiten abrufen, die nicht "Keine Webseite" enthalten
|
|
cursor.execute("SELECT `id`, `domain` FROM `gelbeseiten` WHERE `domain` != 'Keine Webseite' AND `mailadresse` = 'Keine E-Mail' ORDER BY id DESC")
|
|
return cursor.fetchall()
|
|
|
|
def update_email_in_database(cursor, record_id, email):
|
|
"""E-Mail-Adresse in der Tabelle aktualisieren, mit Längenvalidierung."""
|
|
MAX_EMAIL_LENGTH = 255 # Maximale Länge für E-Mail-Adressen in der DB
|
|
if len(email) > MAX_EMAIL_LENGTH:
|
|
print(f"E-Mail-Adresse zu lang: {email}")
|
|
email = email[:MAX_EMAIL_LENGTH] # Abschneiden, um die Länge zu erfüllen
|
|
|
|
try:
|
|
cursor.execute("UPDATE `gelbeseiten` SET `mailadresse` = %s WHERE `id` = %s", (email, record_id))
|
|
except mysql.connector.Error as e:
|
|
print(f"Fehler beim Speichern der E-Mail-Adresse {email}: {e}")
|
|
|
|
def get_impressum_url(domain):
|
|
"""Versucht, die Impressum-URL zu finden."""
|
|
possible_paths = ["/impressum", "/Impressum", "/legal", "/kontakt", "/about-us", "/impressum.php", "/imprint", "/impressum.html", "/impressum.htm", "", "/html/impressum.html", "/index.php/impressum", "/info/impressum/", "/kontakt", "/#impressum", "/Impressum.html", "/recht/impressum", "/web/impressum/impressum.html"]
|
|
for path in possible_paths:
|
|
impressum_url = urljoin(domain, path)
|
|
try:
|
|
response = requests.head(impressum_url, timeout=5)
|
|
if response.status_code == 200:
|
|
return impressum_url
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
return None
|
|
|
|
def scrape_email_from_html(html):
|
|
"""Extrahiert E-Mail-Adressen aus dem HTML-Inhalt."""
|
|
# HTML-Entities decodieren und Inhalt durchsuchen
|
|
decoded_html = unescape(html)
|
|
|
|
# Suche nach mailto-Links
|
|
soup = BeautifulSoup(decoded_html, "html.parser")
|
|
email_tags = soup.find_all("a", href=re.compile(r"^mailto:"))
|
|
for email_tag in email_tags:
|
|
email = email_tag["href"].replace("mailto:", "").strip()
|
|
if "@" in email:
|
|
return email
|
|
|
|
# Suche nach reinen Text-E-Mail-Adressen
|
|
email_regex = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
|
|
found_emails = re.findall(email_regex, decoded_html)
|
|
if found_emails:
|
|
return found_emails[0] # Erste gefundene E-Mail zurückgeben
|
|
|
|
return None
|
|
|
|
def scrape_email_from_url(url):
|
|
"""Ruft den HTML-Inhalt ab und extrahiert E-Mail-Adressen."""
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
|
|
}
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
if response.status_code != 200:
|
|
print(f"Fehler beim Abrufen von {url}: {response.status_code}")
|
|
return None
|
|
|
|
return scrape_email_from_html(response.text)
|
|
except Exception as e:
|
|
print(f"Fehler beim Abrufen oder Verarbeiten von {url}: {e}")
|
|
return None
|
|
|
|
if __name__ == "__main__":
|
|
conn = connect_to_database()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Nur gültige Webseiten abrufen
|
|
websites = get_valid_websites(cursor)
|
|
|
|
for record_id, domain in websites:
|
|
# URL validieren und vorbereiten
|
|
if not domain.startswith("http"):
|
|
domain = f"http://{domain}" # Standardprotokoll hinzufügen
|
|
|
|
print(f"Suche Impressum für: {domain}")
|
|
impressum_url = get_impressum_url(domain)
|
|
|
|
if impressum_url:
|
|
print(f"Gefundene Impressum-URL: {impressum_url}")
|
|
email = scrape_email_from_url(impressum_url)
|
|
if email:
|
|
update_email_in_database(cursor, record_id, email)
|
|
print(f"E-Mail-Adresse {email} für ID {record_id} gespeichert.")
|
|
else:
|
|
print(f"Keine E-Mail-Adresse im Impressum für {domain} gefunden.")
|
|
else:
|
|
print(f"Kein Impressum gefunden für {domain}.")
|
|
|
|
# Änderungen speichern
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"Fehler: {e}")
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|