import uuid import json import os import requests from datetime import datetime from flask import request, jsonify, Response, Blueprint, current_app from . import db from .models import Conversation, Message, User, TokenUsage from . import load_config from .tools import registry, ToolExecutor bp = Blueprint("api", __name__) cfg = load_config() API_URL = cfg.get("api_url") API_KEY = cfg["api_key"] MODELS = cfg.get("models", []) DEFAULT_MODEL = cfg.get("default_model", "glm-5") # -- Helpers ---------------------------------------------- def get_or_create_default_user(): user = User.query.filter_by(username="default").first() if not user: user = User(username="default", password="") db.session.add(user) db.session.commit() return user def ok(data=None, message=None): body = {"code": 0} if data is not None: body["data"] = data if message is not None: body["message"] = message return jsonify(body) def err(code, message): return jsonify({"code": code, "message": message}), code def to_dict(inst, **extra): d = {c.name: getattr(inst, c.name) for c in inst.__table__.columns} for k in ("created_at", "updated_at"): if k in d and hasattr(d[k], "strftime"): d[k] = d[k].strftime("%Y-%m-%dT%H:%M:%SZ") # Parse tool_calls JSON if present if "tool_calls" in d and d["tool_calls"]: try: d["tool_calls"] = json.loads(d["tool_calls"]) except: pass # Filter out None values for cleaner API response d = {k: v for k, v in d.items() if v is not None} d.update(extra) return d def record_token_usage(user_id, model, prompt_tokens, completion_tokens): """Record token usage""" from datetime import date today = date.today() usage = TokenUsage.query.filter_by( user_id=user_id, date=today, model=model ).first() if usage: usage.prompt_tokens += prompt_tokens usage.completion_tokens += completion_tokens usage.total_tokens += prompt_tokens + completion_tokens else: usage = TokenUsage( user_id=user_id, date=today, model=model, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=prompt_tokens + completion_tokens, ) db.session.add(usage) db.session.commit() def build_glm_messages(conv): """Build messages list for GLM API from conversation""" msgs = [] if conv.system_prompt: msgs.append({"role": "system", "content": conv.system_prompt}) # Query messages directly to avoid detached instance warning messages = Message.query.filter_by(conversation_id=conv.id).order_by(Message.created_at.asc()).all() for m in messages: msgs.append({"role": m.role, "content": m.content}) return msgs # -- Models API ------------------------------------------- @bp.route("/api/models", methods=["GET"]) def list_models(): """Get available model list""" return ok(MODELS) # -- Tools API -------------------------------------------- @bp.route("/api/tools", methods=["GET"]) def list_tools(): """Get available tool list""" tools = registry.list_all() return ok({ "tools": tools, "total": len(tools) }) # -- Token Usage Statistics -------------------------------- @bp.route("/api/stats/tokens", methods=["GET"]) def token_stats(): """Get token usage statistics""" from sqlalchemy import func from datetime import date, timedelta user = get_or_create_default_user() period = request.args.get("period", "daily") # daily, weekly, monthly today = date.today() if period == "daily": # Today's statistics stats = TokenUsage.query.filter_by(user_id=user.id, date=today).all() result = { "period": "daily", "date": today.isoformat(), "prompt_tokens": sum(s.prompt_tokens for s in stats), "completion_tokens": sum(s.completion_tokens for s in stats), "total_tokens": sum(s.total_tokens for s in stats), "by_model": {s.model: {"prompt": s.prompt_tokens, "completion": s.completion_tokens, "total": s.total_tokens} for s in stats} } elif period == "weekly": # Weekly statistics (last 7 days) start_date = today - timedelta(days=6) stats = TokenUsage.query.filter( TokenUsage.user_id == user.id, TokenUsage.date >= start_date, TokenUsage.date <= today ).all() daily_data = {} for s in stats: d = s.date.isoformat() if d not in daily_data: daily_data[d] = {"prompt": 0, "completion": 0, "total": 0} daily_data[d]["prompt"] += s.prompt_tokens daily_data[d]["completion"] += s.completion_tokens daily_data[d]["total"] += s.total_tokens # Fill missing dates for i in range(7): d = (today - timedelta(days=6-i)).isoformat() if d not in daily_data: daily_data[d] = {"prompt": 0, "completion": 0, "total": 0} result = { "period": "weekly", "start_date": start_date.isoformat(), "end_date": today.isoformat(), "prompt_tokens": sum(s.prompt_tokens for s in stats), "completion_tokens": sum(s.completion_tokens for s in stats), "total_tokens": sum(s.total_tokens for s in stats), "daily": daily_data } elif period == "monthly": # Monthly statistics (last 30 days) start_date = today - timedelta(days=29) stats = TokenUsage.query.filter( TokenUsage.user_id == user.id, TokenUsage.date >= start_date, TokenUsage.date <= today ).all() daily_data = {} for s in stats: d = s.date.isoformat() if d not in daily_data: daily_data[d] = {"prompt": 0, "completion": 0, "total": 0} daily_data[d]["prompt"] += s.prompt_tokens daily_data[d]["completion"] += s.completion_tokens daily_data[d]["total"] += s.total_tokens # Fill missing dates for i in range(30): d = (today - timedelta(days=29-i)).isoformat() if d not in daily_data: daily_data[d] = {"prompt": 0, "completion": 0, "total": 0} result = { "period": "monthly", "start_date": start_date.isoformat(), "end_date": today.isoformat(), "prompt_tokens": sum(s.prompt_tokens for s in stats), "completion_tokens": sum(s.completion_tokens for s in stats), "total_tokens": sum(s.total_tokens for s in stats), "daily": daily_data } else: return err(400, "invalid period") return ok(result) # -- Conversation CRUD ------------------------------------ @bp.route("/api/conversations", methods=["GET", "POST"]) def conversation_list(): if request.method == "POST": d = request.json or {} user = get_or_create_default_user() conv = Conversation( id=str(uuid.uuid4()), user_id=user.id, title=d.get("title", ""), model=d.get("model", DEFAULT_MODEL), system_prompt=d.get("system_prompt", ""), temperature=d.get("temperature", 1.0), max_tokens=d.get("max_tokens", 65536), thinking_enabled=d.get("thinking_enabled", False), ) db.session.add(conv) db.session.commit() return ok(to_dict(conv)) cursor = request.args.get("cursor") limit = min(int(request.args.get("limit", 20)), 100) user = get_or_create_default_user() q = Conversation.query.filter_by(user_id=user.id) if cursor: q = q.filter(Conversation.updated_at < ( db.session.query(Conversation.updated_at).filter_by(id=cursor).scalar() or datetime.utcnow)) rows = q.order_by(Conversation.updated_at.desc()).limit(limit + 1).all() items = [to_dict(r, message_count=r.messages.count()) for r in rows[:limit]] return ok({ "items": items, "next_cursor": items[-1]["id"] if len(rows) > limit else None, "has_more": len(rows) > limit, }) @bp.route("/api/conversations/", methods=["GET", "PATCH", "DELETE"]) def conversation_detail(conv_id): conv = db.session.get(Conversation, conv_id) if not conv: return err(404, "conversation not found") if request.method == "GET": return ok(to_dict(conv)) if request.method == "DELETE": db.session.delete(conv) db.session.commit() return ok(message="deleted") d = request.json or {} for k in ("title", "model", "system_prompt", "temperature", "max_tokens", "thinking_enabled"): if k in d: setattr(conv, k, d[k]) db.session.commit() return ok(to_dict(conv)) # -- Messages --------------------------------------------- @bp.route("/api/conversations//messages", methods=["GET", "POST"]) def message_list(conv_id): conv = db.session.get(Conversation, conv_id) if not conv: return err(404, "conversation not found") if request.method == "GET": cursor = request.args.get("cursor") limit = min(int(request.args.get("limit", 50)), 100) q = Message.query.filter_by(conversation_id=conv_id) if cursor: q = q.filter(Message.created_at < ( db.session.query(Message.created_at).filter_by(id=cursor).scalar() or datetime.utcnow)) rows = q.order_by(Message.created_at.asc()).limit(limit + 1).all() items = [to_dict(r) for r in rows[:limit]] return ok({ "items": items, "next_cursor": items[-1]["id"] if len(rows) > limit else None, "has_more": len(rows) > limit, }) d = request.json or {} content = (d.get("content") or "").strip() if not content: return err(400, "content is required") user_msg = Message(id=str(uuid.uuid4()), conversation_id=conv_id, role="user", content=content) db.session.add(user_msg) db.session.commit() tools_enabled = d.get("tools_enabled", True) if d.get("stream", False): return _stream_response(conv, tools_enabled) return _sync_response(conv, tools_enabled) @bp.route("/api/conversations//messages/", methods=["DELETE"]) def delete_message(conv_id, msg_id): conv = db.session.get(Conversation, conv_id) if not conv: return err(404, "conversation not found") msg = db.session.get(Message, msg_id) if not msg or msg.conversation_id != conv_id: return err(404, "message not found") db.session.delete(msg) db.session.commit() return ok(message="deleted") # -- Chat Completion ---------------------------------- def _call_glm(conv, stream=False, tools=None, messages=None): """Call GLM API""" body = { "model": conv.model, "messages": messages if messages is not None else build_glm_messages(conv), "max_tokens": conv.max_tokens, "temperature": conv.temperature, } if conv.thinking_enabled: body["thinking"] = {"type": "enabled"} if tools: body["tools"] = tools body["tool_choice"] = "auto" if stream: body["stream"] = True return requests.post( API_URL, headers={"Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}"}, json=body, stream=stream, timeout=120, ) def _sync_response(conv, tools_enabled=True): """Sync response with tool call support""" executor = ToolExecutor(registry=registry) tools = registry.list_all() if tools_enabled else None messages = build_glm_messages(conv) max_iterations = 5 # Max tool call iterations # Collect all tool calls and results all_tool_calls = [] all_tool_results = [] for _ in range(max_iterations): try: resp = _call_glm(conv, tools=tools, messages=messages) resp.raise_for_status() result = resp.json() except Exception as e: return err(500, f"upstream error: {e}") choice = result["choices"][0] message = choice["message"] # If no tool calls, return final result if not message.get("tool_calls"): usage = result.get("usage", {}) prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) # Merge tool results into tool_calls merged_tool_calls = [] for i, tc in enumerate(all_tool_calls): merged_tc = dict(tc) if i < len(all_tool_results): merged_tc["result"] = all_tool_results[i]["content"] merged_tool_calls.append(merged_tc) # Save assistant message with all tool calls (including results) msg = Message( id=str(uuid.uuid4()), conversation_id=conv.id, role="assistant", content=message.get("content", ""), token_count=completion_tokens, thinking_content=message.get("reasoning_content", ""), tool_calls=json.dumps(merged_tool_calls) if merged_tool_calls else None ) db.session.add(msg) db.session.commit() user = get_or_create_default_user() record_token_usage(user.id, conv.model, prompt_tokens, completion_tokens) return ok({ "message": to_dict(msg, thinking_content=msg.thinking_content or None), "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": usage.get("total_tokens", 0) }, }) # Process tool calls tool_calls = message["tool_calls"] all_tool_calls.extend(tool_calls) messages.append(message) # Execute tools and add results tool_results = executor.process_tool_calls(tool_calls) all_tool_results.extend(tool_results) messages.extend(tool_results) return err(500, "exceeded maximum tool call iterations") def _stream_response(conv, tools_enabled=True): """Stream response with tool call support""" conv_id = conv.id conv_model = conv.model app = current_app._get_current_object() executor = ToolExecutor(registry=registry) tools = registry.list_all() if tools_enabled else None # Build messages BEFORE entering generator (in request context) initial_messages = build_glm_messages(conv) def generate(): messages = list(initial_messages) # Copy to avoid mutation max_iterations = 5 # Collect all tool calls and results all_tool_calls = [] all_tool_results = [] total_content = "" total_thinking = "" total_tokens = 0 total_prompt_tokens = 0 for iteration in range(max_iterations): full_content = "" full_thinking = "" token_count = 0 prompt_tokens = 0 msg_id = str(uuid.uuid4()) tool_calls_list = [] current_tool_call = None try: with app.app_context(): active_conv = db.session.get(Conversation, conv_id) resp = _call_glm(active_conv, stream=True, tools=tools, messages=messages) resp.raise_for_status() for line in resp.iter_lines(): if not line: continue line = line.decode("utf-8") if not line.startswith("data: "): continue data_str = line[6:] if data_str == "[DONE]": break try: chunk = json.loads(data_str) except json.JSONDecodeError: continue delta = chunk["choices"][0].get("delta", {}) # Process thinking chain reasoning = delta.get("reasoning_content", "") if reasoning: full_thinking += reasoning yield f"event: thinking\ndata: {json.dumps({'content': reasoning}, ensure_ascii=False)}\n\n" # Process text content text = delta.get("content", "") if text: full_content += text yield f"event: message\ndata: {json.dumps({'content': text}, ensure_ascii=False)}\n\n" # Process tool calls tool_calls_delta = delta.get("tool_calls", []) for tc in tool_calls_delta: idx = tc.get("index", 0) if idx >= len(tool_calls_list): tool_calls_list.append({ "id": tc.get("id", ""), "type": tc.get("type", "function"), "function": {"name": "", "arguments": ""} }) if tc.get("id"): tool_calls_list[idx]["id"] = tc["id"] if tc.get("function"): if tc["function"].get("name"): tool_calls_list[idx]["function"]["name"] = tc["function"]["name"] if tc["function"].get("arguments"): tool_calls_list[idx]["function"]["arguments"] += tc["function"]["arguments"] usage = chunk.get("usage", {}) if usage: token_count = usage.get("completion_tokens", 0) prompt_tokens = usage.get("prompt_tokens", 0) except Exception as e: yield f"event: error\ndata: {json.dumps({'content': str(e)}, ensure_ascii=False)}\n\n" return # If tool calls exist, execute and continue loop if tool_calls_list: # Collect tool calls all_tool_calls.extend(tool_calls_list) # Send tool call info yield f"event: tool_calls\ndata: {json.dumps({'calls': tool_calls_list}, ensure_ascii=False)}\n\n" # Execute tools tool_results = executor.process_tool_calls(tool_calls_list) messages.append({ "role": "assistant", "content": full_content or None, "tool_calls": tool_calls_list }) messages.extend(tool_results) # Collect tool results all_tool_results.extend(tool_results) # Send tool results for tr in tool_results: yield f"event: tool_result\ndata: {json.dumps({'name': tr['name'], 'content': tr['content']}, ensure_ascii=False)}\n\n" continue # No tool calls, finish - save everything total_content = full_content total_thinking = full_thinking total_tokens = token_count total_prompt_tokens = prompt_tokens # Merge tool results into tool_calls merged_tool_calls = [] for i, tc in enumerate(all_tool_calls): merged_tc = dict(tc) if i < len(all_tool_results): merged_tc["result"] = all_tool_results[i]["content"] merged_tool_calls.append(merged_tc) with app.app_context(): # Save assistant message with all tool calls (including results) msg = Message( id=msg_id, conversation_id=conv_id, role="assistant", content=total_content, token_count=total_tokens, thinking_content=total_thinking, tool_calls=json.dumps(merged_tool_calls) if merged_tool_calls else None ) db.session.add(msg) db.session.commit() user = get_or_create_default_user() record_token_usage(user.id, conv_model, total_prompt_tokens, total_tokens) yield f"event: done\ndata: {json.dumps({'message_id': msg_id, 'token_count': total_tokens})}\n\n" return yield f"event: error\ndata: {json.dumps({'content': 'exceeded maximum tool call iterations'}, ensure_ascii=False)}\n\n" return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) def register_routes(app): app.register_blueprint(bp)