"""Tool helper services""" import re import httpx from urllib.parse import parse_qs, urlparse, quote from typing import List from concurrent.futures import ThreadPoolExecutor, as_completed from bs4 import BeautifulSoup from ddgs import DDGS class SearchService: """DuckDuckGo search using ddgs library""" def search(self, query: str, max_results: int = 5) -> List[dict]: results = [] try: for result in DDGS().text(query, max_results=max_results): results.append({ "title": result.get("title", ""), "url": result.get("href", ""), "snippet": result.get("body", "") }) except Exception: pass return results class FetchService: """Page fetch using ddgs with concurrent support""" def __init__(self, timeout: float = 15.0): self.timeout = timeout def fetch(self, url: str, extract_type: str = "text") -> dict: if not url.startswith(("http://", "https://")): url = "https://" + url try: result = DDGS().fetch(url) if result and result.get("results"): page_data = result["results"][0] return { "url": url, "title": page_data.get("title", ""), "text": page_data.get("text", "")[:15000] } except Exception as e: return {"error": str(e)} return {"url": url, "title": "", "text": ""} def fetch_batch(self, urls: List[str], extract_type: str = "text", max_concurrent: int = 5) -> List[dict]: if len(urls) <= 1: return [self.fetch(url, extract_type) for url in urls] results = [None] * len(urls) max_concurrent = min(max(max_concurrent, 1), 5) with ThreadPoolExecutor(max_workers=max_concurrent) as pool: futures = {pool.submit(self.fetch, url, extract_type): i for i, url in enumerate(urls)} for future in as_completed(futures): results[futures[future]] = future.result() return results