Erster Push

This commit is contained in:
Christopher Meinhold 2025-01-25 00:08:28 +01:00
commit 61fb594f61
9 changed files with 1166 additions and 0 deletions

View File

@ -0,0 +1,65 @@
import re
import mysql.connector
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
# Verbindung zur Datenbank herstellen
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
# Daten auslesen
def get_data_from_database(cursor):
cursor.execute("SELECT id, adresse FROM gelbeseiten WHERE adresse IS NOT NULL AND adresse != ''")
return cursor.fetchall()
# Adresse aufteilen und aktualisieren
def update_address_in_database(cursor, record_id, street, plz, city):
cursor.execute("UPDATE `gelbeseiten` SET adresse = %s, plz = %s, ort = %s WHERE `id` = %s", (street, plz, city, record_id))
# Hauptfunktion zur Verarbeitung
def process_addresses_and_update_db(conn):
cursor = conn.cursor()
# Daten auslesen
records = get_data_from_database(cursor)
print(f"{len(records)} Einträge gefunden.")
# Regex-Pattern
pattern = r"^(.*?),\s+(\d{5})\s+([^0-9]+)\s+\d+,\d+\s+km$"
for record in records:
record_id, address_text = record
match = re.match(pattern, address_text)
if match:
street = match.group(1).strip() # Straße
plz = match.group(2) # PLZ
city = match.group(3).replace("\t", "").replace("\n", "").strip() # Ort
# Adresse in der Datenbank aktualisieren
update_address_in_database(cursor, record_id, street, plz, city)
conn.commit()
else:
print(f"ID {record_id} - Kein Match für Adresse: {address_text}")
conn.commit()
cursor.close()
print("Alle Adressen wurden verarbeitet.")
if __name__ == "__main__":
# Verbindung zur Datenbank
conn = connect_to_database()
try:
process_addresses_and_update_db(conn)
except Exception as e:
print(f"Fehler: {e}")
finally:
conn.close()

78
gelbeseiten_Branche.py Normal file
View File

@ -0,0 +1,78 @@
import requests
from bs4 import BeautifulSoup
import mysql.connector
from urllib.parse import urljoin
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_valid_data_realids(cursor):
# Abrufen der Einträge mit data-realid
cursor.execute("SELECT `id`, `data-realid` FROM `gelbeseiten` WHERE `branche` IS NULL OR `branche` = ''")
return cursor.fetchall()
def update_branche_in_database(cursor, record_id, branche):
# Branche in der Tabelle aktualisieren
cursor.execute("UPDATE `gelbeseiten` SET `branche` = %s WHERE `id` = %s", (branche, record_id))
def scrape_branche_by_realid(data_realid):
"""Extrahiert die Branche über die data-realid."""
base_url = f"https://www.gelbeseiten.de/gsbiz/{data_realid}"
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(base_url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Fehler beim Abrufen von {base_url}: {response.status_code}")
return None
soup = BeautifulSoup(response.text, "html.parser")
# Branche suchen
branche_tag = soup.find("span", {"data-selenium": "teilnehmerkopf__branche"})
if branche_tag:
return branche_tag.text.strip()
print(f"Keine Branche auf {base_url} gefunden.")
return None
except Exception as e:
print(f"Fehler beim Verarbeiten von {base_url}: {e}")
return None
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Abrufen der gültigen data-realid-Einträge
records = get_valid_data_realids(cursor)
for record_id, data_realid in records:
print(f"Scrape Branche für data-realid: {data_realid}")
branche = scrape_branche_by_realid(data_realid)
if branche:
# Branche in der Datenbank aktualisieren
update_branche_in_database(cursor, record_id, branche)
print(f"Branche '{branche}' für ID {record_id} gespeichert.")
else:
print(f"Keine Branche für data-realid {data_realid} gefunden.")
# Änderungen speichern
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

88
mailadresse_finden.py Normal file
View File

@ -0,0 +1,88 @@
import requests
from bs4 import BeautifulSoup
import mysql.connector
import re
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_valid_websites(cursor):
# Webseiten abrufen, die nicht "Keine Webseite" enthalten
cursor.execute("SELECT `id`, `domain` FROM `gelbeseiten` WHERE `domain` != 'Keine Webseite' AND `mailadresse` = 'Keine E-Mail'")
return cursor.fetchall()
def update_email_in_database(cursor, record_id, email):
# E-Mail-Adresse in der Tabelle aktualisieren
cursor.execute("UPDATE `gelbeseiten` SET `mailadresse` = %s WHERE `id` = %s", (email, record_id))
def scrape_email_from_website(url):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Fehler beim Abrufen von {url}: {response.status_code}")
return None
soup = BeautifulSoup(response.text, "html.parser")
# E-Mail-Adressen suchen (mailto:)
email_tag = soup.find("a", href=re.compile(r"^mailto:"))
if email_tag:
email = email_tag["href"].replace("mailto:", "").strip()
# Überprüfen, ob die E-Mail-Adresse ein '@' enthält
if "@" in email:
return email
else:
print(f"Ungültige E-Mail-Adresse gefunden: {email}")
return None
print(f"Keine E-Mail-Adresse auf {url} gefunden.")
return None
except Exception as e:
print(f"Fehler beim Verarbeiten der Webseite {url}: {e}")
return None
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Nur gültige Webseiten abrufen
websites = get_valid_websites(cursor)
for record_id, domain in websites:
# URL validieren und vorbereiten
if not domain.startswith("http"):
url = f"http://{domain}" # Standardprotokoll hinzufügen
else:
url = domain
print(f"Scrape E-Mail-Adresse von: {url}")
# E-Mail-Adresse scrappen
email = scrape_email_from_website(url)
if email:
# E-Mail-Adresse in der Datenbank aktualisieren
update_email_in_database(cursor, record_id, email)
print(f"E-Mail-Adresse {email} für ID {record_id} gespeichert.")
# Änderungen speichern
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

View File

@ -0,0 +1,137 @@
import requests
from bs4 import BeautifulSoup
import re
import mysql.connector
from urllib.parse import urljoin, urlparse
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_valid_websites(cursor):
# Webseiten abrufen, die nicht "Keine Webseite" enthalten
cursor.execute("SELECT `id`, `domain` FROM `gelbeseiten` WHERE `domain` != 'Keine Webseite' AND `mailadresse` = 'Keine E-Mail' order by id desc")
return cursor.fetchall()
def update_email_in_database(cursor, record_id, email):
"""E-Mail-Adresse in der Tabelle aktualisieren, mit Längenvalidierung."""
MAX_EMAIL_LENGTH = 255 # Maximale Länge für E-Mail-Adressen in der DB
if len(email) > MAX_EMAIL_LENGTH:
print(f"E-Mail-Adresse zu lang: {email}")
email = email[:MAX_EMAIL_LENGTH] # Abschneiden, um die Länge zu erfüllen
try:
cursor.execute("UPDATE `gelbeseiten` SET `mailadresse` = %s WHERE `id` = %s", (email, record_id))
except mysql.connector.Error as e:
print(f"Fehler beim Speichern der E-Mail-Adresse {email}: {e}")
def get_impressum_url(domain):
"""Versucht, die Impressum-URL zu finden."""
possible_paths = ["/impressum", "/Impressum", "/legal", "/kontakt", "/about-us", "/impressum.php", "/imprint", "/impressum.html", "/impressum.htm", "/about", "", "/html/impressum.html", "/index.php/impressum", "/info/impressum/", "/kontakt", "/#impressum", "/Impressum.html", "/recht/impressum", "/web/impressum/impressum.html"]
for path in possible_paths:
impressum_url = urljoin(domain, path)
try:
response = requests.head(impressum_url, timeout=5)
if response.status_code == 200:
return impressum_url
except requests.RequestException:
continue
# Versuchen, die Sitemap zu finden und Impressum von dort zu extrahieren
sitemap_url = urljoin(domain, "/sitemap.xml")
try:
response = requests.get(sitemap_url, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "xml")
urls = soup.find_all("loc")
for loc in urls:
if "impressum" in loc.text.lower():
return loc.text
except requests.RequestException:
pass
return None
def scrape_email_from_impressum(url):
"""Extrahiert die E-Mail-Adresse aus dem Impressum, inklusive E-Mail-Adressen als Text."""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Fehler beim Abrufen des Impressums von {url}: {response.status_code}")
return None
soup = BeautifulSoup(response.text, "html.parser")
# E-Mail-Adressen in mailto-Links suchen
email_tags = soup.find_all("a", href=re.compile(r"^mailto:"))
for email_tag in email_tags:
email = email_tag["href"].replace("mailto:", "").strip()
if "@" in email:
return email
# E-Mail-Adressen im Text suchen (regex)
email_regex = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
body_text = soup.get_text()
found_emails = re.findall(email_regex, body_text)
for email in found_emails:
# Zusätzliche Validierung, falls nötig
if "@" in email:
return email
print(f"Keine E-Mail-Adresse im Impressum von {url} gefunden.")
return None
except Exception as e:
print(f"Fehler beim Verarbeiten des Impressums {url}: {e}")
return None
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Nur gültige Webseiten abrufen
websites = get_valid_websites(cursor)
for record_id, domain in websites:
# URL validieren und vorbereiten
if not domain.startswith("http"):
domain = f"http://{domain}" # Standardprotokoll hinzufügen
print(f"Suche Impressum für: {domain}")
impressum_url = get_impressum_url(domain)
if impressum_url:
print(f"Gefundene Impressum-URL: {impressum_url}")
# E-Mail-Adresse aus dem Impressum scrappen
email = scrape_email_from_impressum(impressum_url)
if email:
# E-Mail-Adresse in der Datenbank aktualisieren
update_email_in_database(cursor, record_id, email)
print(f"E-Mail-Adresse {email} für ID {record_id} gespeichert.")
else:
print(f"Keine E-Mail-Adresse im Impressum für {domain} gefunden.")
else:
print(f"Kein Impressum gefunden für {domain}.")
# Änderungen speichern
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

View File

@ -0,0 +1,117 @@
import requests
from bs4 import BeautifulSoup
import re
import mysql.connector
from urllib.parse import urljoin, urlparse
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_valid_websites(cursor):
# Webseiten abrufen, die nicht "Keine Webseite" enthalten
cursor.execute("SELECT `id`, `domain` FROM `gelbeseiten` WHERE `domain` != 'Keine Webseite' AND `mailadresse` = 'Keine E-Mail'")
return cursor.fetchall()
def update_email_in_database(cursor, record_id, email):
# E-Mail-Adresse in der Tabelle aktualisieren
cursor.execute("UPDATE `gelbeseiten` SET `mailadresse` = %s WHERE `id` = %s", (email, record_id))
def get_impressum_url(domain):
"""Versucht, die Impressum-URL zu finden."""
possible_paths = ["/impressum", "/Impressum", "/legal", "/kontakt", "/about-us", "/impressum.php", "/imprint", "/impressum.html", "/impressum.htm", "/about", "", "/html/impressum.html", "/index.php/impressum", "/info/impressum/", "/kontakt", "/#impressum", "/Impressum.html", "/recht/impressum"]
for path in possible_paths:
impressum_url = urljoin(domain, path)
try:
response = requests.head(impressum_url, timeout=5)
if response.status_code == 200:
return impressum_url
except requests.RequestException:
continue
# Versuchen, die Sitemap zu finden und Impressum von dort zu extrahieren
sitemap_url = urljoin(domain, "/sitemap.xml")
try:
response = requests.get(sitemap_url, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "xml")
urls = soup.find_all("loc")
for loc in urls:
if "impressum" in loc.text.lower():
return loc.text
except requests.RequestException:
pass
return None
def scrape_email_from_impressum(url):
"""Extrahiert die E-Mail-Adresse aus dem Impressum."""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Fehler beim Abrufen des Impressums von {url}: {response.status_code}")
return None
soup = BeautifulSoup(response.text, "html.parser")
# E-Mail-Adressen suchen
email_tags = soup.find_all("a", href=re.compile(r"^mailto:"))
for email_tag in email_tags:
email = email_tag["href"].replace("mailto:", "").strip()
if "@" in email:
return email
print(f"Keine E-Mail-Adresse im Impressum von {url} gefunden.")
return None
except Exception as e:
print(f"Fehler beim Verarbeiten des Impressums {url}: {e}")
return None
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Nur gültige Webseiten abrufen
websites = get_valid_websites(cursor)
for record_id, domain in websites:
# URL validieren und vorbereiten
if not domain.startswith("http"):
domain = f"http://{domain}" # Standardprotokoll hinzufügen
print(f"Suche Impressum für: {domain}")
impressum_url = get_impressum_url(domain)
if impressum_url:
print(f"Gefundene Impressum-URL: {impressum_url}")
# E-Mail-Adresse aus dem Impressum scrappen
email = scrape_email_from_impressum(impressum_url)
if email:
# E-Mail-Adresse in der Datenbank aktualisieren
update_email_in_database(cursor, record_id, email)
print(f"E-Mail-Adresse {email} für ID {record_id} gespeichert.")
else:
print(f"Keine E-Mail-Adresse im Impressum für {domain} gefunden.")
else:
print(f"Kein Impressum gefunden für {domain}.")
# Änderungen speichern
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

123
mailadresse_v5.py Normal file
View File

@ -0,0 +1,123 @@
import requests
from bs4 import BeautifulSoup
import re
import mysql.connector
from urllib.parse import urljoin, urlparse
from html import unescape
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_valid_websites(cursor):
# Webseiten abrufen, die nicht "Keine Webseite" enthalten
cursor.execute("SELECT `id`, `domain` FROM `gelbeseiten` WHERE `domain` != 'Keine Webseite' AND `mailadresse` = 'Keine E-Mail' ORDER BY id DESC")
return cursor.fetchall()
def update_email_in_database(cursor, record_id, email):
"""E-Mail-Adresse in der Tabelle aktualisieren, mit Längenvalidierung."""
MAX_EMAIL_LENGTH = 255 # Maximale Länge für E-Mail-Adressen in der DB
if len(email) > MAX_EMAIL_LENGTH:
print(f"E-Mail-Adresse zu lang: {email}")
email = email[:MAX_EMAIL_LENGTH] # Abschneiden, um die Länge zu erfüllen
try:
cursor.execute("UPDATE `gelbeseiten` SET `mailadresse` = %s WHERE `id` = %s", (email, record_id))
except mysql.connector.Error as e:
print(f"Fehler beim Speichern der E-Mail-Adresse {email}: {e}")
def get_impressum_url(domain):
"""Versucht, die Impressum-URL zu finden."""
possible_paths = ["/impressum", "/Impressum", "/legal", "/kontakt", "/about-us", "/impressum.php", "/imprint", "/impressum.html", "/impressum.htm", "", "/html/impressum.html", "/index.php/impressum", "/info/impressum/", "/kontakt", "/#impressum", "/Impressum.html", "/recht/impressum", "/web/impressum/impressum.html"]
for path in possible_paths:
impressum_url = urljoin(domain, path)
try:
response = requests.head(impressum_url, timeout=5)
if response.status_code == 200:
return impressum_url
except requests.RequestException:
continue
return None
def scrape_email_from_html(html):
"""Extrahiert E-Mail-Adressen aus dem HTML-Inhalt."""
# HTML-Entities decodieren und Inhalt durchsuchen
decoded_html = unescape(html)
# Suche nach mailto-Links
soup = BeautifulSoup(decoded_html, "html.parser")
email_tags = soup.find_all("a", href=re.compile(r"^mailto:"))
for email_tag in email_tags:
email = email_tag["href"].replace("mailto:", "").strip()
if "@" in email:
return email
# Suche nach reinen Text-E-Mail-Adressen
email_regex = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"
found_emails = re.findall(email_regex, decoded_html)
if found_emails:
return found_emails[0] # Erste gefundene E-Mail zurückgeben
return None
def scrape_email_from_url(url):
"""Ruft den HTML-Inhalt ab und extrahiert E-Mail-Adressen."""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"Fehler beim Abrufen von {url}: {response.status_code}")
return None
return scrape_email_from_html(response.text)
except Exception as e:
print(f"Fehler beim Abrufen oder Verarbeiten von {url}: {e}")
return None
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Nur gültige Webseiten abrufen
websites = get_valid_websites(cursor)
for record_id, domain in websites:
# URL validieren und vorbereiten
if not domain.startswith("http"):
domain = f"http://{domain}" # Standardprotokoll hinzufügen
print(f"Suche Impressum für: {domain}")
impressum_url = get_impressum_url(domain)
if impressum_url:
print(f"Gefundene Impressum-URL: {impressum_url}")
email = scrape_email_from_url(impressum_url)
if email:
update_email_in_database(cursor, record_id, email)
print(f"E-Mail-Adresse {email} für ID {record_id} gespeichert.")
else:
print(f"Keine E-Mail-Adresse im Impressum für {domain} gefunden.")
else:
print(f"Kein Impressum gefunden für {domain}.")
# Änderungen speichern
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

191
scrapper_Adresse_v2.py Normal file
View File

@ -0,0 +1,191 @@
import requests
from bs4 import BeautifulSoup
import mysql.connector
import time
import re
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_data_from_table(cursor, table_name, column_name):
cursor.execute(f"SELECT `{column_name}` FROM `{table_name}`")
return [row[0] for row in cursor.fetchall()]
def insert_into_gelbeseiten(cursor, data):
sql = """
INSERT INTO gelbeseiten
(`data-realid`, `firmenname`, `domain`, `mailadresse`, `adresse`, `plz`, `ort`, `telefonnummer`, `ansprechpartner`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, data)
def check_if_realid_exists(cursor, realid):
cursor.execute("SELECT 1 FROM gelbeseiten WHERE `data-realid` = %s", (realid,))
return cursor.fetchone() is not None
def split_address(address):
# Adresse in Straße, Hausnummer, PLZ, Ort aufteilen
address = address.strip().replace("\n", " ")
# Annahme: Die Adresse hat das Format: "Straße Hausnummer, PLZ Ort"
# Beispiel: "Musterstraße 1, 12345 Dresden"
address_parts = re.split(r',\s*', address)
if len(address_parts) == 2:
street_and_number = address_parts[0].strip()
plz_and_city = address_parts[1].strip()
# Aufteilen der PLZ und Stadt
plz_city_parts = re.split(r'\s+', plz_and_city)
plz = plz_city_parts[0]
city = " ".join(plz_city_parts[1:]) if len(plz_city_parts) > 1 else ""
return street_and_number, plz, city
else:
# Falls die Adresse nicht wie erwartet ist, gebe leere Werte zurück
return address, "Unbekannt", "Unbekannt"
def scrape_gelbeseiten(search_term, location, radius_km=50000):
base_url = "https://www.gelbeseiten.de/suche"
search_url = f"{base_url}/{search_term}/{location}?umkreis={radius_km}"
print(f"Scrape {search_term} in {location}...")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(search_url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Suchergebnisse für {search_term} in {location}.")
return []
soup = BeautifulSoup(response.text, "html.parser")
articles = soup.find_all("article", class_="mod mod-Treffer")
results = []
for article in articles:
try:
realid = article.get("data-realid")
if not realid:
continue
name_tag = article.find("h2", class_="mod-Treffer__name")
name = name_tag.text.strip() if name_tag else "Unbekannt"
address_tag = article.find("div", class_="mod-AdresseKompakt__adress-text")
address = address_tag.text.strip().replace("\n", " ") if address_tag else "Keine Adresse"
street, plz, city = split_address(address)
phone_tag = article.find("a", class_="mod-TelefonnummerKompakt__phoneNumber")
phone = phone_tag.text.strip() if phone_tag else "Keine Telefonnummer"
detail_url = f"https://www.gelbeseiten.de/gsbiz/{realid}"
detail_data = scrape_detail_page(detail_url, headers)
results.append({
"realid": realid,
"name": name,
"street": street,
"plz": plz,
"city": city,
"phone": phone,
"website": detail_data.get("website", "Keine Webseite"),
"email": detail_data.get("email", "Keine E-Mail"),
"contact": detail_data.get("contact", "Unbekannt"),
})
print(f"Gescrapt: {name}")
time.sleep(1)
except Exception as e:
print(f"Fehler beim Verarbeiten eines Eintrags: {e}")
return results
def scrape_detail_page(url, headers):
"""Scrape Detailseite für Website, E-Mail, Ansprechpartner und Branche."""
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Detailseite: {url}")
return {}
soup = BeautifulSoup(response.text, "html.parser")
# Webseite extrahieren
website_div = soup.find("div", class_="mod-Kontaktdaten__list-item contains-icon-big-homepage")
website = website_div.find("a")["href"] if website_div and website_div.find("a") else "Keine Webseite"
# E-Mail aus einem direkten <a>-Tag suchen
email_tag = soup.find("a", href=lambda href: href and "mailto:" in href)
email = email_tag["href"].replace("mailto:", "") if email_tag else "Keine E-Mail"
# Alternative E-Mail aus dem "aktionsleiste-button"-Div suchen
email_div = soup.find("div", id="email_versenden")
if email_div and email_div.get("data-link"):
email_match = re.search(r"mailto:([^?]+)", email_div["data-link"])
if email_match:
email = email_match.group(1)
# Ansprechpartner extrahieren
contact_tag = soup.find("div", class_="mod-Ansprechpartner__name")
contact = contact_tag.text.strip() if contact_tag else "Unbekannt"
# Branche extrahieren
branche_tag = soup.find("div", class_="mod-TeilnehmerKopf__branchen")
branche_span = branche_tag.find("span", {"data-selenium": "teilnehmerkopf__branche"}) if branche_tag else None
branche = branche_span.text.strip() if branche_span else "Keine Branche"
return {
"website": website,
"email": email,
"contact": contact,
"branche": branche
}
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Städte und Rubriken abrufen
staedte = get_data_from_table(cursor, "staedte", "staedte")
rubriken = get_data_from_table(cursor, "rubriken", "rubriken")
for stadt in staedte:
for rubrik in rubriken:
results = scrape_gelbeseiten(rubrik, stadt)
# Ergebnisse speichern, nur wenn die data-realid noch nicht existiert
for result in results:
if not check_if_realid_exists(cursor, result["realid"]):
data = (
result["realid"],
result["name"],
result["website"],
result["email"],
result["street"],
result["plz"],
result["city"],
result["phone"],
result["contact"]
)
insert_into_gelbeseiten(cursor, data)
else:
print(f"Die RealID {result['realid']} existiert bereits, überspringe das Einfügen.")
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

193
scrapper_Adresse_v3.py Normal file
View File

@ -0,0 +1,193 @@
import requests
from bs4 import BeautifulSoup
import mysql.connector
import time
import re
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_data_from_table(cursor, table_name, column_name):
cursor.execute(f"SELECT `{column_name}` FROM `{table_name}` order by id desc")
return [row[0] for row in cursor.fetchall()]
def insert_into_gelbeseiten(cursor, data):
sql = """
INSERT INTO gelbeseiten
(`data-realid`, `firmenname`, `domain`, `mailadresse`, `adresse`, `plz`, `ort`, `telefonnummer`, `ansprechpartner`,`branche` )
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, data)
def check_if_realid_exists(cursor, realid):
cursor.execute("SELECT 1 FROM gelbeseiten WHERE `data-realid` = %s", (realid,))
return cursor.fetchone() is not None
def split_address(address):
# Adresse in Straße, Hausnummer, PLZ, Ort aufteilen
address = address.strip().replace("\n", " ")
# Annahme: Die Adresse hat das Format: "Straße Hausnummer, PLZ Ort"
# Beispiel: "Musterstraße 1, 12345 Dresden"
address_parts = re.split(r',\s*', address)
if len(address_parts) == 2:
street_and_number = address_parts[0].strip()
plz_and_city = address_parts[1].strip()
# Aufteilen der PLZ und Stadt
plz_city_parts = re.split(r'\s+', plz_and_city)
plz = plz_city_parts[0]
city = " ".join(plz_city_parts[1:]) if len(plz_city_parts) > 1 else ""
return street_and_number, plz, city
else:
# Falls die Adresse nicht wie erwartet ist, gebe leere Werte zurück
return address, "Unbekannt", "Unbekannt"
def scrape_gelbeseiten(search_term, location, radius_km=50000):
base_url = "https://www.gelbeseiten.de/suche"
search_url = f"{base_url}/{search_term}/{location}?umkreis={radius_km}"
print(f"Scrape {search_term} in {location}...")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(search_url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Suchergebnisse für {search_term} in {location}.")
return []
soup = BeautifulSoup(response.text, "html.parser")
articles = soup.find_all("article", class_="mod mod-Treffer")
results = []
for article in articles:
try:
realid = article.get("data-realid")
if not realid:
continue
name_tag = article.find("h2", class_="mod-Treffer__name")
name = name_tag.text.strip() if name_tag else "Unbekannt"
address_tag = article.find("div", class_="mod-AdresseKompakt__adress-text")
address = address_tag.text.strip().replace("\n", " ") if address_tag else "Keine Adresse"
street, plz, city = split_address(address)
phone_tag = article.find("a", class_="mod-TelefonnummerKompakt__phoneNumber")
phone = phone_tag.text.strip() if phone_tag else "Keine Telefonnummer"
detail_url = f"https://www.gelbeseiten.de/gsbiz/{realid}"
detail_data = scrape_detail_page(detail_url, headers)
results.append({
"realid": realid,
"name": name,
"street": street,
"plz": plz,
"city": city,
"phone": phone,
"website": detail_data.get("website", "Keine Webseite"),
"email": detail_data.get("email", "Keine E-Mail"),
"contact": detail_data.get("contact", "Unbekannt"),
"branche": detail_data.get("branche", "Keine Branche")
})
print(f"Gescrapt: {name}")
time.sleep(1)
except Exception as e:
print(f"Fehler beim Verarbeiten eines Eintrags: {e}")
return results
def scrape_detail_page(url, headers):
"""Scrape Detailseite für Website, E-Mail, Ansprechpartner und Branche."""
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Detailseite: {url}")
return {}
soup = BeautifulSoup(response.text, "html.parser")
# Webseite extrahieren
website_div = soup.find("div", class_="mod-Kontaktdaten__list-item contains-icon-big-homepage")
website = website_div.find("a")["href"] if website_div and website_div.find("a") else "Keine Webseite"
# E-Mail aus einem direkten <a>-Tag suchen
email_tag = soup.find("a", href=lambda href: href and "mailto:" in href)
email = email_tag["href"].replace("mailto:", "") if email_tag else "Keine E-Mail"
# Alternative E-Mail aus dem "aktionsleiste-button"-Div suchen
email_div = soup.find("div", id="email_versenden")
if email_div and email_div.get("data-link"):
email_match = re.search(r"mailto:([^?]+)", email_div["data-link"])
if email_match:
email = email_match.group(1)
# Ansprechpartner extrahieren
contact_tag = soup.find("div", class_="mod-Ansprechpartner__name")
contact = contact_tag.text.strip() if contact_tag else "Unbekannt"
# Branche extrahieren
branche_tag = soup.find("div", class_="mod-TeilnehmerKopf__branchen")
branche_span = branche_tag.find("span", {"data-selenium": "teilnehmerkopf__branche"}) if branche_tag else None
branche = branche_span.text.strip() if branche_span else "Keine Branche"
return {
"website": website,
"email": email,
"contact": contact,
"branche": branche
}
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Städte und Rubriken abrufen
staedte = get_data_from_table(cursor, "staedte", "staedte")
rubriken = get_data_from_table(cursor, "rubriken", "rubriken")
for stadt in staedte:
for rubrik in rubriken:
results = scrape_gelbeseiten(rubrik, stadt)
# Ergebnisse speichern, nur wenn die data-realid noch nicht existiert
for result in results:
if not check_if_realid_exists(cursor, result["realid"]):
data = (
result["realid"],
result["name"],
result["website"],
result["email"],
result["street"],
result["plz"],
result["city"],
result["phone"],
result["contact"],
result["branche"]
)
insert_into_gelbeseiten(cursor, data)
else:
print(f"Die RealID {result['realid']} existiert bereits, überspringe das Einfügen.")
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()

174
scrapper_adresse.py Normal file
View File

@ -0,0 +1,174 @@
import requests
from bs4 import BeautifulSoup
import mysql.connector
import time
import re
# MySQL-Verbindungsdetails
DB_HOST = "192.168.178.201"
DB_USER = "gelbeseiten"
DB_PASSWORD = "Gm4bBE62gXCSVVY2"
DB_NAME = "domainchecker"
def connect_to_database():
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def get_data_from_table(cursor, table_name, column_name):
cursor.execute(f"SELECT `{column_name}` FROM `{table_name}`")
return [row[0] for row in cursor.fetchall()]
def insert_into_gelbeseiten(cursor, data):
sql = """
INSERT INTO gelbeseiten
(`data-realid`, `firmenname`, `domain`, `mailadresse`, `adresse`, `plz`, `ort`, `telefonnummer`, `ansprechpartner`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, data)
def check_if_realid_exists(cursor, realid):
cursor.execute("SELECT 1 FROM gelbeseiten WHERE `data-realid` = %s", (realid,))
return cursor.fetchone() is not None
def split_address(address):
# Adresse in Straße, Hausnummer, PLZ, Ort aufteilen
address = address.strip().replace("\n", " ")
# Annahme: Die Adresse hat das Format: "Straße Hausnummer, PLZ Ort"
# Beispiel: "Musterstraße 1, 12345 Dresden"
address_parts = re.split(r',\s*', address)
if len(address_parts) == 2:
street_and_number = address_parts[0].strip()
plz_and_city = address_parts[1].strip()
# Aufteilen der PLZ und Stadt
plz_city_parts = re.split(r'\s+', plz_and_city)
plz = plz_city_parts[0]
city = " ".join(plz_city_parts[1:]) if len(plz_city_parts) > 1 else ""
return street_and_number, plz, city
else:
# Falls die Adresse nicht wie erwartet ist, gebe leere Werte zurück
return address, "Unbekannt", "Unbekannt"
def scrape_gelbeseiten(search_term, location, radius_km=50000):
base_url = "https://www.gelbeseiten.de/suche"
search_url = f"{base_url}/{search_term}/{location}?umkreis={radius_km}"
print(f"Scrape {search_term} in {location}...")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
response = requests.get(search_url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Suchergebnisse für {search_term} in {location}.")
return []
soup = BeautifulSoup(response.text, "html.parser")
articles = soup.find_all("article", class_="mod mod-Treffer")
results = []
for article in articles:
try:
realid = article.get("data-realid")
if not realid:
continue
name_tag = article.find("h2", class_="mod-Treffer__name")
name = name_tag.text.strip() if name_tag else "Unbekannt"
address_tag = article.find("div", class_="mod-AdresseKompakt__adress-text")
address = address_tag.text.strip().replace("\n", " ") if address_tag else "Keine Adresse"
street, plz, city = split_address(address)
phone_tag = article.find("a", class_="mod-TelefonnummerKompakt__phoneNumber")
phone = phone_tag.text.strip() if phone_tag else "Keine Telefonnummer"
detail_url = f"https://www.gelbeseiten.de/gsbiz/{realid}"
detail_data = scrape_detail_page(detail_url, headers)
results.append({
"realid": realid,
"name": name,
"street": street,
"plz": plz,
"city": city,
"phone": phone,
"website": detail_data.get("website", "Keine Webseite"),
"email": detail_data.get("email", "Keine E-Mail"),
"contact": detail_data.get("contact", "Unbekannt"),
})
print(f"Gescrapt: {name}")
time.sleep(1)
except Exception as e:
print(f"Fehler beim Verarbeiten eines Eintrags: {e}")
return results
def scrape_detail_page(url, headers):
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Fehler beim Abrufen der Detailseite: {url}")
return {}
soup = BeautifulSoup(response.text, "html.parser")
website_div = soup.find("div", class_="mod-Kontaktdaten__list-item contains-icon-big-homepage")
website = website_div.find("a")["href"] if website_div and website_div.find("a") else "Keine Webseite"
email_tag = soup.find("a", href=lambda href: href and "mailto:" in href)
email = email_tag["href"].replace("mailto:", "") if email_tag else "Keine E-Mail"
contact_tag = soup.find("div", class_="mod-Ansprechpartner__name")
contact = contact_tag.text.strip() if contact_tag else "Unbekannt"
return {
"website": website,
"email": email,
"contact": contact
}
if __name__ == "__main__":
conn = connect_to_database()
cursor = conn.cursor()
try:
# Städte und Rubriken abrufen
staedte = get_data_from_table(cursor, "staedte", "staedte")
rubriken = get_data_from_table(cursor, "rubriken", "rubriken")
for stadt in staedte:
for rubrik in rubriken:
results = scrape_gelbeseiten(rubrik, stadt)
# Ergebnisse speichern, nur wenn die data-realid noch nicht existiert
for result in results:
if not check_if_realid_exists(cursor, result["realid"]):
data = (
result["realid"],
result["name"],
result["website"],
result["email"],
result["street"],
result["plz"],
result["city"],
result["phone"],
result["contact"]
)
insert_into_gelbeseiten(cursor, data)
else:
print(f"Die RealID {result['realid']} existiert bereits, überspringe das Einfügen.")
conn.commit()
except Exception as e:
print(f"Fehler: {e}")
finally:
cursor.close()
conn.close()