Luxx/luxx/tools/services.py

303 lines
8.9 KiB
Python

"""Tool helper services"""
import re
import httpx
from urllib.parse import quote
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from ddgs import DDGS
from curl_cffi import requests as curl_requests
class SearchService:
"""Search service supporting multiple engines"""
def __init__(self, engine: str = "bing"):
self.engine = engine
def search(
self,
query: str,
max_results: int = 5,
region: str = "cn-zh"
) -> List[dict]:
"""
Execute search
Args:
query: Search keywords
max_results: Max result count
region: Region setting
Returns:
Search result list
"""
if self.engine == "duckduckgo":
return self._search_duckduckgo(query, max_results, region)
elif self.engine == "bing":
return self._search_bing(query, max_results, region)
else:
raise ValueError(f"Unsupported search engine: {self.engine}")
def _search_duckduckgo(
self,
query: str,
max_results: int,
region: str
) -> List[dict]:
"""DuckDuckGo search with region support"""
with DDGS() as ddgs:
results = list(ddgs.text(
query,
max_results=max_results,
region=region
))
return [
{
"title": r.get("title", ""),
"url": r.get("href", ""),
"snippet": r.get("body", "")
}
for r in results
]
def _search_bing(
self,
query: str,
max_results: int,
region: str
) -> List[dict]:
"""Bing search using curl-cffi to simulate browser"""
# Map region to Bing market code
market_map = {
"cn-zh": "zh-CN",
"us-en": "en-US",
}
market = market_map.get(region, "en-US")
results = []
offset = 0
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": f"{market},en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
while len(results) < max_results:
url = f"https://www.bing.com/search?q={quote(query)}&first={offset}&mkt={market}"
try:
response = curl_requests.get(
url,
headers=headers,
impersonate="chrome",
timeout=15
)
if response.status_code != 200:
break
soup = BeautifulSoup(response.text, "html.parser")
# Find search result items
for item in soup.select("li.b_algo"):
title_elem = item.select_one("h2 a")
snippet_elem = item.select_one("div.b_paractl")
cite_elem = item.select_one("cite")
if title_elem:
title = title_elem.get_text(strip=True)
url = title_elem.get("href", "")
# Get snippet
snippet = ""
if snippet_elem:
snippet = snippet_elem.get_text(strip=True)
elif cite_elem:
snippet = cite_elem.get_text(strip=True)
results.append({
"title": title,
"url": url,
"snippet": snippet
})
if len(results) >= max_results:
break
# Check if there are more results
next_page = soup.select_one("a.sb_pagN")
if not next_page or len(results) >= max_results:
break
offset += 10
except Exception as e:
break
return results[:max_results]
class FetchService:
"""Page fetch service with content extraction support"""
def __init__(self, timeout: float = 30.0, user_agent: str = None):
self.timeout = timeout
self.user_agent = user_agent or (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
def fetch(
self,
url: str,
extract_type: str = "text"
) -> dict:
"""
Fetch a single page
Args:
url: Page URL
extract_type: Extract type (text, links, structured)
Returns:
Fetch result
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
resp = curl_requests.get(
url,
timeout=self.timeout,
impersonate="chrome",
headers={
"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
)
resp.raise_for_status()
except Exception as e:
return {"error": str(e), "url": url}
html = resp.text
extractor = ContentExtractor(html)
if extract_type == "text":
return {
"url": url,
"text": extractor.extract_text()
}
elif extract_type == "links":
return {
"url": url,
"links": extractor.extract_links()
}
else:
return extractor.extract_structured(url)
def fetch_batch(
self,
urls: List[str],
extract_type: str = "text",
max_concurrent: int = 5
) -> List[dict]:
"""
Batch fetch pages concurrently.
Args:
urls: URL list
extract_type: Extract type
max_concurrent: Max concurrent requests (1-5, default 5)
Returns:
Result list (same order as input URLs)
"""
if len(urls) <= 1:
return [self.fetch(url, extract_type) for url in urls]
max_concurrent = min(max(max_concurrent, 1), 5)
results = [None] * len(urls)
with ThreadPoolExecutor(max_workers=max_concurrent) as pool:
futures = {
pool.submit(self.fetch, url, extract_type): i
for i, url in enumerate(urls)
}
for future in as_completed(futures):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = {"error": str(e)}
return results
class ContentExtractor:
"""Content extractor for HTML pages"""
def __init__(self, html: str):
self.html = html
self._soup = None
@property
def soup(self):
if self._soup is None:
self._soup = BeautifulSoup(self.html, "html.parser")
return self._soup
def extract_text(self) -> str:
"""Extract plain text"""
# Remove script, style, nav, footer, header
for tag in self.soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = self.soup.get_text(separator="\n", strip=True)
# Clean extra whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
return text
def extract_links(self) -> List[dict]:
"""Extract links"""
links = []
for a in self.soup.find_all("a", href=True):
text = a.get_text(strip=True)
href = a["href"]
if text and href and not href.startswith(("#", "javascript:")):
links.append({"text": text, "href": href})
return links[:50] # Limit count
def extract_structured(self, url: str = "") -> dict:
"""Extract structured content"""
soup = self.soup
# Extract title
title = ""
if soup.title:
title = soup.title.string or ""
# Extract meta description
description = ""
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc:
description = meta_desc.get("content", "")
return {
"url": url,
"title": title.strip() if title else "",
"description": description.strip(),
"text": self.extract_text()[:5000],
"links": self.extract_links()[:20]
}