| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- package handler
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "time"
- "yunion.io/x/log"
- "yunion.io/x/pkg/errors"
- "yunion.io/x/pkg/util/httputils"
- "yunion.io/x/onecloud/pkg/apigateway/options"
- "yunion.io/x/onecloud/pkg/appsrv"
- "yunion.io/x/onecloud/pkg/httperrors"
- "yunion.io/x/onecloud/pkg/mcclient/auth"
- modules "yunion.io/x/onecloud/pkg/mcclient/modules/llm"
- mcpServerOption "yunion.io/x/onecloud/pkg/mcp-server/options"
- )
- func mcpServersConfigHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- serviceName := "mcp-server"
- url, err := auth.GetPublicServiceURL(serviceName, options.Options.Region, "", httputils.GET)
- if err != nil {
- log.Warningf("GetPublicServiceURL for %s failed: %v", serviceName, err)
- }
- sseURL := fmt.Sprintf("%s/sse", url)
- responseType := r.URL.Query().Get("type")
- switch responseType {
- case "claude":
- // Claude 仅支持单个自定义 header,使用 X-API-Key。填写方式:
- // base64(ak:sk):`echo -n "你的AK:你的SK" | base64`,将输出填入
- cmd := fmt.Sprintf("claude mcp add --transport sse %s --header \"X-API-Key: <填写 token 或 base64(AK:SK)>\"", sseURL)
- w.Header().Set("Content-Type", "text/plain; charset=utf-8")
- w.Write([]byte(cmd))
- return
- case "cursor":
- // fall through to JSON
- default:
- // default: return JSON (cursor format)
- }
- // Cursor:在 headers 中填写控制台/CLI 获取的 Access Key 与 Secret Key
- config := map[string]interface{}{
- "mcpServers": map[string]interface{}{
- mcpServerOption.Options.MCPServerName: map[string]interface{}{
- "url": sseURL,
- "headers": map[string]string{
- "AK": "<填写 Access Key>",
- "SK": "<填写 Secret Key>",
- },
- },
- },
- }
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(config)
- }
- func chatHandlerInfo(method, prefix string, handler func(context.Context, http.ResponseWriter, *http.Request)) *appsrv.SHandlerInfo {
- log.Debugf("%s - %s", method, prefix)
- hi := appsrv.SHandlerInfo{}
- hi.SetMethod(method)
- hi.SetPath(prefix)
- hi.SetHandler(handler)
- hi.SetProcessTimeout(6 * time.Hour)
- // Use default worker manager with default pool size (usually 32)
- // instead of uploader worker which has limited pool size (4)
- return &hi
- }
- func mcpAgentChatStreamHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- params, _, body := appsrv.FetchEnv(ctx, w, r)
- id := params["<id>"]
- if len(id) == 0 {
- httperrors.MissingParameterError(ctx, w, "id")
- return
- }
- token := AppContextToken(ctx)
- s := auth.GetSession(ctx, token, FetchRegion(r))
- // Prepare request to backend
- headers := http.Header{}
- headers.Set("Content-Type", "application/json")
- // Forward the request body to the backend
- var bodyReader io.Reader
- if body != nil {
- bodyStr := body.String()
- bodyReader = strings.NewReader(bodyStr)
- }
- path := fmt.Sprintf("/mcp_agents/%s/chat-stream", id)
- resp, err := s.RawVersionRequest(
- modules.MCPAgent.ServiceType(),
- modules.MCPAgent.EndpointType(),
- "POST",
- path,
- headers,
- bodyReader,
- )
- if err != nil {
- httperrors.GeneralServerError(ctx, w, errors.Wrap(err, "request backend"))
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode != 200 {
- // Read error body
- respBody, _ := io.ReadAll(resp.Body)
- // Try to parse as JSON error if possible, or just return as is
- if resp.StatusCode >= 400 && resp.StatusCode < 500 {
- httperrors.InputParameterError(ctx, w, "backend error: %s", string(respBody))
- } else {
- httperrors.GeneralServerError(ctx, w, fmt.Errorf("backend error %d: %s", resp.StatusCode, string(respBody)))
- }
- return
- }
- // Set SSE headers
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
- // For now just standard SSE headers.
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- // Stream the response from backend to client
- buf := make([]byte, 1024)
- for {
- n, err := resp.Body.Read(buf)
- if n > 0 {
- if _, wErr := w.Write(buf[:n]); wErr != nil {
- log.Errorf("write response error: %v", wErr)
- return
- }
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- }
- if err != nil {
- if err != io.EOF {
- log.Errorf("read backend response error: %v", err)
- }
- break
- }
- }
- }
- // mcpAgentDefaultChatStreamHandler 将请求转发到 region 的 default-chat-stream(使用 default_agent=true 的条目)
- func mcpAgentDefaultChatStreamHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- token := AppContextToken(ctx)
- s := auth.GetSession(ctx, token, FetchRegion(r))
- headers := http.Header{}
- headers.Set("Content-Type", "application/json")
- var bodyReader io.Reader
- if r.Body != nil {
- bodyReader = r.Body
- }
- path := "/mcp_agents/default-chat-stream"
- resp, err := s.RawVersionRequest(
- modules.MCPAgent.ServiceType(),
- modules.MCPAgent.EndpointType(),
- "POST",
- path,
- headers,
- bodyReader,
- )
- if err != nil {
- httperrors.GeneralServerError(ctx, w, errors.Wrap(err, "request backend"))
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode != 200 {
- respBody, _ := io.ReadAll(resp.Body)
- if resp.StatusCode >= 400 && resp.StatusCode < 500 {
- httperrors.InputParameterError(ctx, w, "backend error: %s", string(respBody))
- } else {
- httperrors.GeneralServerError(ctx, w, fmt.Errorf("backend error %d: %s", resp.StatusCode, string(respBody)))
- }
- return
- }
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- buf := make([]byte, 1024)
- for {
- n, err := resp.Body.Read(buf)
- if n > 0 {
- if _, wErr := w.Write(buf[:n]); wErr != nil {
- log.Errorf("write response error: %v", wErr)
- return
- }
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- }
- if err != nil {
- if err != io.EOF {
- log.Errorf("read backend response error: %v", err)
- }
- break
- }
- }
- }
- // mcpAgentDefaultToolsHandler 将 GET 请求转发到 region 的 default-mcp-tools(仅使用 options.MCPServerURL,不通过 mcp_agent 条目)
- func mcpAgentDefaultToolsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- token := AppContextToken(ctx)
- s := auth.GetSession(ctx, token, FetchRegion(r))
- path := "/mcp_agents/default-mcp-tools"
- resp, err := s.RawVersionRequest(
- modules.MCPAgent.ServiceType(),
- modules.MCPAgent.EndpointType(),
- "GET",
- path,
- nil,
- nil,
- )
- if err != nil {
- httperrors.GeneralServerError(ctx, w, errors.Wrap(err, "request backend"))
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode != 200 {
- respBody, _ := io.ReadAll(resp.Body)
- if resp.StatusCode >= 400 && resp.StatusCode < 500 {
- httperrors.InputParameterError(ctx, w, "backend error: %s", string(respBody))
- } else {
- httperrors.GeneralServerError(ctx, w, fmt.Errorf("backend error %d: %s", resp.StatusCode, string(respBody)))
- }
- return
- }
- w.Header().Set("Content-Type", "application/json")
- _, err = io.Copy(w, resp.Body)
- if err != nil {
- log.Errorf("write default mcp tools response error: %v", err)
- }
- }
|