Luxx/luxx/tools/services.py

248 lines
7.1 KiB
Python

"""Tool helper services"""
import re
import httpx
from urllib.parse import parse_qs, urlparse
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
class SearchService:
"""Search service using DuckDuckGo"""
def __init__(self, engine: str = "duckduckgo"):
self.engine = engine
def search(
self,
query: str,
max_results: int = 5,
region: str = "cn-zh"
) -> List[dict]:
"""
Execute search
Args:
query: Search keywords
max_results: Max result count
region: Region setting
Returns:
Search result list
"""
if self.engine == "duckduckgo":
return self._search_duckduckgo(query, max_results, region)
else:
raise ValueError(f"Unsupported search engine: {self.engine}")
def _search_duckduckgo(
self,
query: str,
max_results: int,
region: str
) -> List[dict]:
"""DuckDuckGo search via HTML"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml",
}
from urllib.parse import quote
encoded_query = quote(query)
url = f"https://html.duckduckgo.com/html/?q={encoded_query}"
try:
resp = httpx.get(url, headers=headers, timeout=15, follow_redirects=True)
resp.raise_for_status()
except Exception:
return []
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select(".result")[:max_results]:
title_elem = result.select_one(".result__title a")
snippet_elem = result.select_one(".result__snippet")
if title_elem:
raw_url = title_elem.get("href", "")
# Clean DuckDuckGo redirect URL
if "uddg=" in raw_url:
parsed = urlparse(raw_url)
params = parse_qs(parsed.query)
clean_url = params.get("uddg", [raw_url])[0]
else:
clean_url = raw_url
results.append({
"title": title_elem.get_text(strip=True),
"url": clean_url,
"snippet": snippet_elem.get_text(strip=True) if snippet_elem else ""
})
return results
class FetchService:
"""Page fetch service"""
def __init__(self, timeout: float = 15.0):
self.timeout = timeout
self.user_agent = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
def fetch(
self,
url: str,
extract_type: str = "text"
) -> dict:
"""
Fetch a single page
Args:
url: Page URL
extract_type: Extract type (text, links, structured)
Returns:
Fetch result
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
resp = httpx.get(
url,
timeout=self.timeout,
follow_redirects=True,
headers={"User-Agent": self.user_agent}
)
resp.raise_for_status()
except httpx.TimeoutException:
return {"error": "Request timeout", "url": url}
except Exception as e:
return {"error": str(e), "url": url}
html = resp.text
extractor = ContentExtractor(html)
if extract_type == "text":
return {
"url": url,
"title": extractor.extract_title(),
"text": extractor.extract_text()[:15000]
}
elif extract_type == "links":
return {
"url": url,
"links": extractor.extract_links()
}
else:
return extractor.extract_structured(url)
def fetch_batch(
self,
urls: List[str],
extract_type: str = "text",
max_concurrent: int = 5
) -> List[dict]:
"""
Batch fetch pages concurrently.
Args:
urls: URL list
extract_type: Extract type
max_concurrent: Max concurrent requests (1-5, default 5)
Returns:
Result list (same order as input URLs)
"""
if len(urls) <= 1:
return [self.fetch(url, extract_type) for url in urls]
max_concurrent = min(max(max_concurrent, 1), 5)
results = [None] * len(urls)
with ThreadPoolExecutor(max_workers=max_concurrent) as pool:
futures = {
pool.submit(self.fetch, url, extract_type): i
for i, url in enumerate(urls)
}
for future in as_completed(futures):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = {"error": str(e)}
return results
class ContentExtractor:
"""Content extractor using BeautifulSoup"""
def __init__(self, html: str):
self.html = html
self._soup = None
@property
def soup(self):
if self._soup is None:
from bs4 import BeautifulSoup
self._soup = BeautifulSoup(self.html, "html.parser")
return self._soup
def extract_title(self) -> str:
"""Extract page title"""
if self.soup.title:
return self.soup.title.string or ""
return ""
def extract_text(self) -> str:
"""Extract plain text"""
# Remove script and style
for tag in self.soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
text = self.soup.get_text(separator="\n", strip=True)
# Clean extra whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
return text
def extract_links(self, max_count: int = 50) -> List[dict]:
"""Extract links"""
links = []
for a in self.soup.find_all("a", href=True):
text = a.get_text(strip=True)
href = a["href"]
if text and href and not href.startswith(("#", "javascript:", "mailto:", "tel:")):
links.append({"text": text, "url": href})
if len(links) >= max_count:
break
return links
def extract_structured(self, url: str = "") -> dict:
"""Extract structured content"""
soup = self.soup
# Extract title
title = ""
if soup.title:
title = soup.title.string or ""
# Extract meta description
description = ""
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc:
description = meta_desc.get("content", "")
return {
"url": url,
"title": title.strip(),
"description": description.strip(),
"text": self.extract_text()[:5000],
"links": self.extract_links(20)
}