Luxx/luxx/tools/services.py

81 lines
3.0 KiB
Python

"""Tool helper services"""
import re
import httpx
from urllib.parse import parse_qs, urlparse, quote
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from ddgs import DDGS
class SearchService:
"""DuckDuckGo search using ddgs library"""
def search(self, query: str, max_results: int = 5) -> List[dict]:
results = []
try:
for result in DDGS().text(query, max_results=max_results):
results.append({
"title": result.get("title", ""),
"url": result.get("href", ""),
"snippet": result.get("body", "")
})
except Exception:
pass
return results
class FetchService:
"""Page fetch with concurrent support"""
def __init__(self, timeout: float = 15.0):
self.timeout = timeout
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
def fetch(self, url: str, extract_type: str = "text") -> dict:
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
resp = httpx.get(url, timeout=self.timeout, follow_redirects=True, headers={"User-Agent": self.user_agent})
resp.raise_for_status()
except httpx.TimeoutException:
return {"error": "Request timeout"}
except Exception as e:
return {"error": str(e)}
soup = BeautifulSoup(resp.text, "html.parser")
title = soup.title.string if soup.title else ""
# Remove noise
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
if extract_type == "links":
links = [{"text": a.get_text(strip=True), "url": a["href"]}
for a in soup.find_all("a", href=True)
if a.get_text(strip=True) and not a["href"].startswith(("#", "javascript:"))]
return {"url": url, "links": links[:50]}
text = re.sub(r"\n{3,}", "\n\n", soup.get_text(separator="\n", strip=True))
if extract_type == "structured":
meta_desc = soup.find("meta", attrs={"name": "description"})
return {"url": url, "title": title, "description": (meta_desc.get("content", "") if meta_desc else ""), "text": text[:5000]}
return {"url": url, "title": title, "text": text[:15000]}
def fetch_batch(self, urls: List[str], extract_type: str = "text", max_concurrent: int = 5) -> List[dict]:
if len(urls) <= 1:
return [self.fetch(url, extract_type) for url in urls]
results = [None] * len(urls)
max_concurrent = min(max(max_concurrent, 1), 5)
with ThreadPoolExecutor(max_workers=max_concurrent) as pool:
futures = {pool.submit(self.fetch, url, extract_type): i for i, url in enumerate(urls)}
for future in as_completed(futures):
results[futures[future]] = future.result()
return results