diff --git a/go.mod b/go.mod index 2b7c1dfd62..1ea9fd61a9 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 + github.com/mark3labs/mcp-go v0.43.0 github.com/nats-io/nats-server/v2 v2.10.27 github.com/nats-io/nats.go v1.39.1 github.com/prometheus/client_golang v1.19.1 @@ -70,7 +71,9 @@ require ( github.com/ajg/form v1.5.1 // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/sonic v1.11.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect @@ -120,6 +123,7 @@ require ( github.com/huandu/xstrings v1.4.0 // indirect github.com/imkira/go-interpol v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/invopop/jsonschema v0.13.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect @@ -166,7 +170,7 @@ require ( github.com/shopspring/decimal v1.4.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect - github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/cast v1.7.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect @@ -178,6 +182,7 @@ require ( github.com/ugorji/go/codec v1.2.12 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.37.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect @@ -185,6 +190,7 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect + github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/yudai/gojsondiff v1.0.0 // indirect github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect go.mongodb.org/mongo-driver v1.15.0 // indirect diff --git a/go.sum b/go.sum index c32852019f..c8ae4d4958 100644 --- a/go.sum +++ b/go.sum @@ -75,12 +75,16 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.11.3 h1:jRN+yEjakWh8aK5FzrciUHG8OFXK+4/KrAX/ysEtHAA= @@ -373,6 +377,8 @@ github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= +github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= @@ -429,6 +435,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mark3labs/mcp-go v0.43.0 h1:lgiKcWMddh4sngbU+hoWOZ9iAe/qp/m851RQpj3Y7jA= +github.com/mark3labs/mcp-go v0.43.0/go.mod h1:YnJfOL382MIWDx1kMY+2zsRHU/q78dBg9aFb8W6Thdw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= @@ -557,8 +565,8 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= -github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -609,6 +617,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.37.0 h1:7WHCyI7EAkQMVmrfBhWTCOaeROb1aCBiTopx63LkMbE= github.com/valyala/fasthttp v1.37.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -626,6 +636,8 @@ github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17 github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 h1:6fRhSjgLCkTD3JnJxvaJ4Sj+TYblw757bqYgZaOq5ZY= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= +github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index b1c5fa5aa7..74a0e0a182 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -48,7 +48,7 @@ import ( dfv1versiond "github.com/numaproj/numaflow/pkg/client/clientset/versioned" dfv1clients "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" daemonclient "github.com/numaproj/numaflow/pkg/daemon/client" - mvtdaemonclient "github.com/numaproj/numaflow/pkg/mvtxdaemon/client" + mvtxdaemonclient "github.com/numaproj/numaflow/pkg/mvtxdaemon/client" "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/webhook/validator" "github.com/numaproj/numaflow/server/authn" @@ -92,16 +92,18 @@ func WithReadOnlyMode() HandlerOption { } type handler struct { - kubeClient kubernetes.Interface - metricsClient metricsclientv1beta1.MetricsV1beta1Interface - promQlServiceObj PromQl - numaflowClient dfv1clients.NumaflowV1alpha1Interface - daemonClientsCache *lru.Cache[string, daemonclient.DaemonClient] - mvtDaemonClientsCache *lru.Cache[string, mvtdaemonclient.MonoVertexDaemonClient] - dexObj *DexObject - localUsersAuthObject *LocalUsersAuthObject - healthChecker *HealthChecker - opts *handlerOptions + kubeClient kubernetes.Interface + metricsClient metricsclientv1beta1.MetricsV1beta1Interface + promQlServiceObj PromQl + numaflowClient dfv1clients.NumaflowV1alpha1Interface + daemonClientsCache *lru.Cache[string, daemonclient.DaemonClient] + mvtxDaemonClientsCache *lru.Cache[string, mvtxdaemonclient.MonoVertexDaemonClient] + dexObj *DexObject + localUsersAuthObject *LocalUsersAuthObject + healthChecker *HealthChecker + opts *handlerOptions + + mcpToolRegistry ToolRegistry } // NewHandler is used to provide a new instance of the handler type @@ -120,10 +122,10 @@ func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *Lo } metricsClient := metricsclientv1beta1.NewForConfigOrDie(k8sRestConfig) numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1() - daemonClientsCache, _ := lru.NewWithEvict[string, daemonclient.DaemonClient](500, func(key string, value daemonclient.DaemonClient) { + daemonClientsCache, _ := lru.NewWithEvict(500, func(key string, value daemonclient.DaemonClient) { _ = value.Close() }) - mvtDaemonClientsCache, _ := lru.NewWithEvict[string, mvtdaemonclient.MonoVertexDaemonClient](500, func(key string, value mvtdaemonclient.MonoVertexDaemonClient) { + mvtxDaemonClientsCache, _ := lru.NewWithEvict(500, func(key string, value mvtxdaemonclient.MonoVertexDaemonClient) { _ = value.Close() }) @@ -133,20 +135,26 @@ func NewHandler(ctx context.Context, dexObj *DexObject, localUsersAuthObject *Lo opt(o) } } + mcpToolRegistry := NewMCPToolkit(kubeClient, numaflowClient, metricsClient, daemonClientsCache, mvtxDaemonClientsCache) return &handler{ - kubeClient: kubeClient, - metricsClient: metricsClient, - promQlServiceObj: promQlServiceObj, - numaflowClient: numaflowClient, - daemonClientsCache: daemonClientsCache, - mvtDaemonClientsCache: mvtDaemonClientsCache, - dexObj: dexObj, - localUsersAuthObject: localUsersAuthObject, - healthChecker: NewHealthChecker(ctx), - opts: o, + kubeClient: kubeClient, + metricsClient: metricsClient, + promQlServiceObj: promQlServiceObj, + numaflowClient: numaflowClient, + daemonClientsCache: daemonClientsCache, + mvtxDaemonClientsCache: mvtxDaemonClientsCache, + dexObj: dexObj, + localUsersAuthObject: localUsersAuthObject, + healthChecker: NewHealthChecker(ctx), + opts: o, + mcpToolRegistry: mcpToolRegistry, }, nil } +func (h *handler) GetMCPToolRegistry() ToolRegistry { + return h.mcpToolRegistry +} + // AuthInfo loads and returns auth info from cookie func (h *handler) AuthInfo(c *gin.Context) { if h.dexObj == nil && h.localUsersAuthObject == nil { @@ -162,7 +170,8 @@ func (h *handler) AuthInfo(c *gin.Context) { return } - if loginType == "dex" { + switch loginType { + case "dex": cookies := c.Request.Cookies() userIdentityTokenStr, err := common.JoinCookies(common.UserIdentityCookieName, cookies) if err != nil { @@ -198,7 +207,7 @@ func (h *handler) AuthInfo(c *gin.Context) { res := authn.NewUserInfo(&claims, userInfo.IDToken, userInfo.RefreshToken) c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, res)) return - } else if loginType == "local" { + case "local": userIdentityTokenStr, err := c.Cookie(common.JWTCookieName) if err != nil { errMsg := fmt.Sprintf("User is not authenticated, err: %s", err.Error()) @@ -226,10 +235,10 @@ func (h *handler) AuthInfo(c *gin.Context) { res := authn.NewUserInfo(&itc, userIdentityTokenStr, "") c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, res)) return + default: + errMsg := fmt.Sprintf("Unidentified login type received: %v", loginType) + c.JSON(http.StatusUnauthorized, NewNumaflowAPIResponse(&errMsg, nil)) } - - errMsg := fmt.Sprintf("Unidentified login type received: %v", loginType) - c.JSON(http.StatusUnauthorized, NewNumaflowAPIResponse(&errMsg, nil)) } // ListNamespaces is used to provide all the namespaces that have numaflow pipelines running @@ -399,7 +408,7 @@ func (h *handler) CreatePipeline(c *gin.Context) { // ListPipelines is used to provide all the numaflow pipelines in a given namespace func (h *handler) ListPipelines(c *gin.Context) { ns := c.Param("namespace") - plList, err := getPipelines(h, ns) + plList, err := getPipelines(h.numaflowClient, ns) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch all pipelines for namespace %q, %s", ns, err.Error())) @@ -644,7 +653,7 @@ func (h *handler) CreateInterStepBufferService(c *gin.Context) { // ListInterStepBufferServices is used to provide all the interstepbuffer services in a namespace func (h *handler) ListInterStepBufferServices(c *gin.Context) { ns := c.Param("namespace") - isbList, err := getIsbServices(h, ns) + isbList, err := getIsbServices(h.numaflowClient, ns) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch all interstepbuffer services for namespace %q, %s", ns, err.Error())) return @@ -1105,7 +1114,7 @@ func (h *handler) GetPipelineStatus(c *gin.Context) { // ListMonoVertices is used to provide all the mono vertices in a namespace. func (h *handler) ListMonoVertices(c *gin.Context) { ns := c.Param("namespace") - mvtList, err := getMonoVertices(h, ns) + mvtList, err := getMonoVertices(h.numaflowClient, ns) if err != nil { h.respondWithError(c, fmt.Sprintf("Failed to fetch all mono vertices for namespace %q, %s", ns, err.Error())) return @@ -1419,8 +1428,8 @@ func getAllNamespaces(h *handler) ([]string, error) { } // getPipelines is a utility used to fetch all the pipelines in a given namespace -func getPipelines(h *handler, namespace string) (Pipelines, error) { - plList, err := h.numaflowClient.Pipelines(namespace).List(context.Background(), metav1.ListOptions{}) +func getPipelines(numaflowClient dfv1clients.NumaflowV1alpha1Interface, namespace string) (Pipelines, error) { + plList, err := numaflowClient.Pipelines(namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } @@ -1439,8 +1448,8 @@ func getPipelines(h *handler, namespace string) (Pipelines, error) { } // getIsbServices is used to fetch all the interstepbuffer services in a given namespace -func getIsbServices(h *handler, namespace string) (ISBServices, error) { - isbSvcs, err := h.numaflowClient.InterStepBufferServices(namespace).List(context.Background(), metav1.ListOptions{}) +func getIsbServices(numaflowClient dfv1clients.NumaflowV1alpha1Interface, namespace string) (ISBServices, error) { + isbSvcs, err := numaflowClient.InterStepBufferServices(namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } @@ -1457,8 +1466,8 @@ func getIsbServices(h *handler, namespace string) (ISBServices, error) { } // getMonoVertices is a utility used to fetch all the mono vertices in a given namespace -func getMonoVertices(h *handler, namespace string) (MonoVertices, error) { - mvtList, err := h.numaflowClient.MonoVertices(namespace).List(context.Background(), metav1.ListOptions{}) +func getMonoVertices(numaflowClient dfv1clients.NumaflowV1alpha1Interface, namespace string) (MonoVertices, error) { + mvtList, err := numaflowClient.MonoVertices(namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, err } @@ -1478,33 +1487,34 @@ func getMonoVertices(h *handler, namespace string) (MonoVertices, error) { // TODO(API): Change the Daemon service to return the consolidated status of the pipeline // to save on multiple calls to the daemon service func getPipelineStatus(pipeline *dfv1.Pipeline) (string, error) { - retStatus := dfv1.PipelineStatusHealthy - // Check if the pipeline is paused, if so, return inactive status - if pipeline.GetDesiredPhase() == dfv1.PipelinePhasePaused { - retStatus = dfv1.PipelineStatusInactive - } else if pipeline.GetDesiredPhase() == dfv1.PipelinePhaseRunning { - retStatus = dfv1.PipelineStatusHealthy - } else if pipeline.GetDesiredPhase() == dfv1.PipelinePhaseFailed { - retStatus = dfv1.PipelineStatusCritical + switch pipeline.GetDesiredPhase() { + case dfv1.PipelinePhasePaused: + return dfv1.PipelineStatusInactive, nil + case dfv1.PipelinePhaseRunning: + return dfv1.PipelineStatusHealthy, nil + case dfv1.PipelinePhaseFailed: + return dfv1.PipelineStatusCritical, nil + default: + return dfv1.PipelineStatusHealthy, nil } - return retStatus, nil } // GetIsbServiceStatus is used to provide the status of a given InterStepBufferService // TODO: Figure out the correct way to determine if a ISBService is healthy func getIsbServiceStatus(isbsvc *dfv1.InterStepBufferService) (string, error) { - retStatus := ISBServiceStatusHealthy - if isbsvc.Status.Phase == dfv1.ISBSvcPhaseUnknown { - retStatus = ISBServiceStatusInactive - } else if isbsvc.Status.Phase == dfv1.ISBSvcPhasePending || isbsvc.Status.Phase == dfv1.ISBSvcPhaseRunning { - retStatus = ISBServiceStatusHealthy - } else if isbsvc.Status.Phase == dfv1.ISBSvcPhaseFailed { - retStatus = ISBServiceStatusCritical + switch isbsvc.Status.Phase { + case dfv1.ISBSvcPhaseUnknown: + return ISBServiceStatusInactive, nil + case dfv1.ISBSvcPhasePending, dfv1.ISBSvcPhaseRunning: + return ISBServiceStatusHealthy, nil + case dfv1.ISBSvcPhaseFailed: + return ISBServiceStatusCritical, nil + default: + return ISBServiceStatusHealthy, nil } - return retStatus, nil } -func getMonoVertexStatus(mvt *dfv1.MonoVertex) (string, error) { +func getMonoVertexStatus(_ *dfv1.MonoVertex) (string, error) { // TODO - add more logic to determine the status of a mono vertex return dfv1.MonoVertexStatusHealthy, nil } @@ -1626,20 +1636,20 @@ func (h *handler) getPipelineDaemonClient(ns, pipeline string) (daemonclient.Dae } } -func (h *handler) getMonoVertexDaemonClient(ns, mvtName string) (mvtdaemonclient.MonoVertexDaemonClient, error) { - if mvtDaemonClient, ok := h.mvtDaemonClientsCache.Get(monoVertexDaemonSvcAddress(ns, mvtName)); !ok { +func (h *handler) getMonoVertexDaemonClient(ns, mvtName string) (mvtxdaemonclient.MonoVertexDaemonClient, error) { + if mvtDaemonClient, ok := h.mvtxDaemonClientsCache.Get(monoVertexDaemonSvcAddress(ns, mvtName)); !ok { var err error - var c mvtdaemonclient.MonoVertexDaemonClient + var c mvtxdaemonclient.MonoVertexDaemonClient // Default to use gRPC client if strings.EqualFold(h.opts.daemonClientProtocol, "http") { - c, err = mvtdaemonclient.NewRESTfulClient(monoVertexDaemonSvcAddress(ns, mvtName)) + c, err = mvtxdaemonclient.NewRESTfulClient(monoVertexDaemonSvcAddress(ns, mvtName)) } else { - c, err = mvtdaemonclient.NewGRPCClient(monoVertexDaemonSvcAddress(ns, mvtName)) + c, err = mvtxdaemonclient.NewGRPCClient(monoVertexDaemonSvcAddress(ns, mvtName)) } if err != nil { return nil, err } - h.mvtDaemonClientsCache.Add(monoVertexDaemonSvcAddress(ns, mvtName), c) + h.mvtxDaemonClientsCache.Add(monoVertexDaemonSvcAddress(ns, mvtName), c) return c, nil } else { return mvtDaemonClient, nil diff --git a/server/apis/v1/mcp_tools.go b/server/apis/v1/mcp_tools.go new file mode 100644 index 0000000000..9b1c102886 --- /dev/null +++ b/server/apis/v1/mcp_tools.go @@ -0,0 +1,126 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "context" + "encoding/json" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "k8s.io/client-go/kubernetes" + metricsclientv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" + + // metricsclientv1beta1 "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1bta1" + + dfv1clients "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" + daemonclient "github.com/numaproj/numaflow/pkg/daemon/client" + mvtdaemonclient "github.com/numaproj/numaflow/pkg/mvtxdaemon/client" +) + +// ToolRegistry interface for registering tools +type ToolRegistry interface { + RegisteredTools() []ToolDefinition +} + +// ToolDefinition represents a tool with its handler +type ToolDefinition struct { + Tool mcp.Tool + Handler server.ToolHandlerFunc +} + +type toolkit struct { + kubeClient kubernetes.Interface + numaflowClient dfv1clients.NumaflowV1alpha1Interface + metricsClient metricsclientv1beta1.MetricsV1beta1Interface + daemonClientsCache *lru.Cache[string, daemonclient.DaemonClient] + mvtxDaemonClientsCache *lru.Cache[string, mvtdaemonclient.MonoVertexDaemonClient] +} + +func NewMCPToolkit(kubeClient kubernetes.Interface, + numaflowClient dfv1clients.NumaflowV1alpha1Interface, + metricsClient metricsclientv1beta1.MetricsV1beta1Interface, + daemonClientsCache *lru.Cache[string, daemonclient.DaemonClient], + mvtxDaemonClientsCache *lru.Cache[string, mvtdaemonclient.MonoVertexDaemonClient]) ToolRegistry { + return &toolkit{ + kubeClient: kubeClient, + numaflowClient: numaflowClient, + metricsClient: metricsClient, + daemonClientsCache: daemonClientsCache, + mvtxDaemonClientsCache: mvtxDaemonClientsCache, + } +} + +var _ ToolRegistry = (*toolkit)(nil) + +func (tk *toolkit) RegisteredTools() []ToolDefinition { + return []ToolDefinition{ + { + mcp.NewTool("list_pipelines", + mcp.WithDescription("A tool to list all the Pipeline objects in a specified namespace, each Pipeline object also contains healthy status (healthy, unknown, critical, warning, inactive)"), + mcp.WithString("namespace", + mcp.Required(), + mcp.Description("The namespace of the Pipelines"), + ), + ), + tk.listPipelines, + }, + { + mcp.NewTool("list_monovertices", + mcp.WithDescription("A tool to list all the MonoVertex objects in a specified namespace, each MonoVertex also contains healthy status (healthy, critical, warning)"), + mcp.WithString("namespace", + mcp.Required(), + mcp.Description("The namespace of the MonoVertices"), + ), + ), + tk.listMonoVertices, + }, + } +} + +func (tk *toolkit) listPipelines(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + namespace, err := request.RequireString("namespace") + if err != nil { + return mcp.NewToolResultError("namespace is required"), nil + } + pls, err := getPipelines(tk.numaflowClient, namespace) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + bs, err := json.Marshal(pls) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + return mcp.NewToolResultText(string(bs)), nil +} + +func (tk *toolkit) listMonoVertices(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + namespace, err := request.RequireString("namespace") + if err != nil { + return mcp.NewToolResultError("namespace is required"), nil + } + mvs, err := getMonoVertices(tk.numaflowClient, namespace) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + bs, err := json.Marshal(mvs) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + return mcp.NewToolResultText(string(bs)), nil +} diff --git a/server/routes/routes.go b/server/routes/routes.go index 26b8e602de..12f0c1e463 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -20,9 +20,12 @@ import ( "context" "fmt" "net/http" + "time" "github.com/gin-gonic/gin" + mcpserver "github.com/mark3labs/mcp-go/server" + "github.com/numaproj/numaflow" "github.com/numaproj/numaflow/pkg/shared/logging" v1 "github.com/numaproj/numaflow/server/apis/v1" "github.com/numaproj/numaflow/server/authn" @@ -189,6 +192,33 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse r.POST("/metrics-proxy", handler.GetMetricData) // Discover the metrics for a given object type. r.GET("/metrics-discovery/object/:object", handler.DiscoverMetrics) + + // MCP Server + s := mcpserver.NewMCPServer("numaflow-mcp", numaflow.GetVersion().String(), + mcpserver.WithToolCapabilities(false), + mcpserver.WithResourceCapabilities(true, true), + mcpserver.WithRecovery(), + mcpserver.WithLogging(), + ) + + for _, t := range handler.GetMCPToolRegistry().RegisteredTools() { + s.AddTool(t.Tool, t.Handler) + } + + httpServer := mcpserver.NewStreamableHTTPServer(s, + mcpserver.WithHeartbeatInterval(30*time.Second), + mcpserver.WithStateLess(true), + // mcpserver.WithStreamableHTTPServer() + ) + r.GET("/mcp", func(c *gin.Context) { + httpServer.ServeHTTP(c.Writer, c.Request) + }) + r.POST("/mcp", func(c *gin.Context) { + httpServer.ServeHTTP(c.Writer, c.Request) + }) + r.DELETE("/mcp", func(c *gin.Context) { + httpServer.ServeHTTP(c.Writer, c.Request) + }) } // authMiddleware is the middleware for AuthN/AuthZ. @@ -210,17 +240,17 @@ func authMiddleware(ctx context.Context, authorizer authz.Authorizer, dexAuthent } // Authenticate the user based on the login type. - if loginType == "dex" { + switch loginType { + case "dex": userInfo, err = dexAuthenticator.Authenticate(c) - } else if loginType == "local" { + case "local": userInfo, err = localUsersAuthenticator.Authenticate(c) - } else { + default: errMsg := fmt.Sprintf("unidentified login type received: %v", loginType) c.JSON(http.StatusUnauthorized, v1.NewNumaflowAPIResponse(&errMsg, nil)) c.Abort() return } - if err != nil { errMsg := fmt.Sprintf("Failed to authenticate user: %v", err) c.JSON(http.StatusUnauthorized, v1.NewNumaflowAPIResponse(&errMsg, nil))