Conversation
83dab4c to
76a2618
Compare
Add the core Model Serving plugin that provides an authenticated proxy to Databricks Model Serving endpoints. Includes the connector layer (SDK client wrapper) and the plugin layer (Express routes for invoke/stream). Also adds UPSTREAM_ERROR SSE error code for propagating API errors. Signed-off-by: Pawel Kosiec <[email protected]>
76a2618 to
41a0074
Compare
The serving plugin was not forwarding the abort signal to the serving connector, unlike the genie plugin. Without the signal, the connector's fetch request cannot be cancelled and the abort-check loop never triggers. Signed-off-by: Pawel Kosiec <[email protected]>
|
|
||
| logger.debug("Streaming from endpoint %s at %s", endpointName, url); | ||
|
|
||
| const res = await fetch(url, { |
There was a problem hiding this comment.
did we use fetch because the WorkspaceClient does not have this method yet? (Same for the non-stream endpoint)
| endpointName: string, | ||
| body: Record<string, unknown>, | ||
| options?: ServingInvokeOptions, | ||
| ): Promise<unknown> { |
There was a problem hiding this comment.
We don't know the response type?
| "permission": "CAN_QUERY", | ||
| "fields": { | ||
| "name": { | ||
| "env": "DATABRICKS_SERVING_ENDPOINT", |
There was a problem hiding this comment.
As discussed offline - could we use name, since it might make it more clear that we're not expecting the actual URL?
|
|
||
| buffer += decoder.decode(value, { stream: true }); | ||
|
|
||
| if (buffer.length > MAX_BUFFER_SIZE) { |
There was a problem hiding this comment.
I don't have a strong opinion, but would an error be preferable?
| exports(): ServingFactory { | ||
| return ((alias?: string) => ({ | ||
| invoke: (body: Record<string, unknown>) => | ||
| this.invoke(alias ?? "default", body), | ||
| stream: (body: Record<string, unknown>) => | ||
| this.stream(alias ?? "default", body), | ||
| })) as ServingFactory; |
There was a problem hiding this comment.
This will break the asUser functionality, you can probably do the same approach as the files plugin is doing
| // Always strip `stream` from the body — the connector controls this | ||
| const { stream: _stream, ...cleanBody } = body; | ||
|
|
||
| const headers = new Headers({ | ||
| "Content-Type": "application/json", | ||
| Accept: "application/json", | ||
| }); | ||
| await client.config.authenticate(headers); | ||
|
|
||
| logger.debug("Invoking endpoint %s at %s", endpointName, url); |
There was a problem hiding this comment.
a bit hacky no? 😅 I wonder first why aren't we using the client to use the model serving endpoint instead of putting our own fetch? is it because there's no streaming option? if that's the case I would put a big comment explaining why we are doing this
There was a problem hiding this comment.
there's a caveat that not all of the endpoints support streaming
| const headers = new Headers({ | ||
| "Content-Type": "application/json", | ||
| Accept: "text/event-stream", | ||
| }); | ||
| await client.config.authenticate(headers); | ||
|
|
||
| logger.debug("Streaming from endpoint %s at %s", endpointName, url); | ||
|
|
||
| const res = await fetch(url, { |
| const cacheFile = path.join( | ||
| process.cwd(), | ||
| "node_modules", | ||
| ".databricks", | ||
| "appkit", | ||
| ".appkit-serving-types-cache.json", | ||
| ); | ||
| this.schemaAllowlists = await loadEndpointSchemas(cacheFile); | ||
| if (this.schemaAllowlists.size > 0) { | ||
| logger.debug( | ||
| "Loaded schema allowlists for %d endpoint(s)", | ||
| this.schemaAllowlists.size, | ||
| ); | ||
| } |
There was a problem hiding this comment.
This is assuming that the vite plugin is added into the server right? but this pr doesn't have the plugin. I also don't like too much that we are reading this file from the plugin, we should probably follow the same approach as we did with analytics
| injectRoutes(router: IAppRouter) { | ||
| if (this.isNamedMode) { | ||
| this.route(router, { | ||
| name: "invoke", | ||
| method: "post", | ||
| path: "/:alias/invoke", | ||
| handler: async (req: express.Request, res: express.Response) => { | ||
| await this.asUser(req)._handleInvoke(req, res); | ||
| }, | ||
| }); | ||
|
|
||
| this.route(router, { | ||
| name: "stream", | ||
| method: "post", | ||
| path: "/:alias/stream", | ||
| handler: async (req: express.Request, res: express.Response) => { | ||
| await this.asUser(req)._handleStream(req, res); | ||
| }, | ||
| }); | ||
| } else { | ||
| this.route(router, { | ||
| name: "invoke", | ||
| method: "post", | ||
| path: "/invoke", | ||
| handler: async (req: express.Request, res: express.Response) => { | ||
| req.params.alias = "default"; | ||
| await this.asUser(req)._handleInvoke(req, res); | ||
| }, | ||
| }); | ||
|
|
||
| this.route(router, { | ||
| name: "stream", | ||
| method: "post", | ||
| path: "/stream", | ||
| handler: async (req: express.Request, res: express.Response) => { | ||
| req.params.alias = "default"; | ||
| await this.asUser(req)._handleStream(req, res); | ||
| }, | ||
| }); | ||
| } |
There was a problem hiding this comment.
so we do OBO on all by default?
Summary
/api/serving/:alias/invokeand/api/serving/:alias/streamUPSTREAM_ERRORSSE error code for propagating Databricks API errorsDemo
model-serving-demo-compressed.mp4
PR Stack — Model Serving