深入探索Spring AI:源码分析流式回答

在当今的数字时代,流式响应机制不仅提升了系统的性能,还在用户体验上扮演了关键角色。通过引入 Flux
首页 新闻资讯 行业资讯 深入探索Spring AI:源码分析流式回答

今天,我们将重点讲解流式响应的概念与实现。毕竟,AI的流式回答功能与其交互体验密切相关,是提升用户满意度的重要组成部分。

基本用法

基本用法非常简单,只需增加一个 stream 方法即可实现所需功能。接下来,我们将通过代码示例来展示这一过程,帮助您更清晰地理解如何在实际应用中进行操作。请看以下代码:

@GetMapping(value="/ai-stream",produces=MediaType.APPLICATION_OCTET_STREAM_VALUE+";charset=UTF-8")Flux<String>generationByStream(@RequestParam("userInput")String userInput){
    Flux<String>output=chatClient.prompt().user(userInput).stream().content();returnoutput;}

在我们增加 stream 方法之后,返回的对象类型将不再是原来的阻塞式 CallResponseSpec,而是转换为非阻塞的 StreamResponseSpec。与此同时,返回的数据类型也由之前的 String 变更为 Flux。

在深入探讨其具体应用之前,首先让我来介绍一下 Flux 的概念与特性。

Spring WebFlux的处理器实现

首先,在 WebFlux 中,处理器已经实现了非阻塞式的功能。这意味着,只要我们的代码返回一个 Flux 对象,就能轻松实现响应功能。通过这种方式,应用程序能够高效地处理并发请求,而不会因阻塞操作而影响整体性能。

@OverridepublicMono<Void>handle(ServerWebExchange exchange){if(this.handlerMappings==null){returncreateNotFoundError();}if(CorsUtils.isPreFlightRequest(exchange.getRequest())){returnhandlePreFlight(exchange);}returnFlux.fromIterable(this.handlerMappings).concatMap(mapping->mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).onErrorResume(ex->handleResultMono(exchange,Mono.error(ex))).flatMap(handler->handleRequestWith(exchange,handler));}

这里简单介绍一下 Spring WebFlux,虽然这不是我们的重点,但了解其基本概念还是很有帮助的。Spring WebFlux 是 Spring 框架的一部分,专为构建反应式应用而设计。它支持异步和非阻塞的编程模型,使得处理高并发请求变得更加高效。以下是 WebFlux 的几个关键特性:

  1. 反应式编程:WebFlux 基于反应式编程模型,使用 Mono 和 Flux 类型来处理数据流。Mono 表示零或一个元素,而 Flux 则表示零个或多个元素。这种模型使得我们可以轻松处理异步数据流,从而提高代码的可读性和可维护性。

  2. 非阻塞 I/O:WebFlux 通过非阻塞的 I/O 操作(如 Netty 或 Servlet 3.1+ 容器)来实现高效的资源利用。与传统的阻塞 I/O 不同,WebFlux 在等待响应时能够释放线程,这样一来,就可以显著提高应用的并发能力,支持更多的同时请求而不增加线程开销。

了解这些特性将为后续的非阻塞式响应设计奠定基础,帮助我们更好地利用 WebFlux 的能力来提升应用性能。

源码分析

现在我们来详细看看我们的 content 是如何操作的。接下来的代码示例将展示具体的实现方式,帮助我们理解在 WebFlux 中如何处理数据流和响应:

publicFlux<String>content(){returndoGetFluxChatResponse(this.request).map(r->{if(r.getResult()==null||r.getResult().getOutput()==null||r.getResult().getOutput().getContent()==null){return"";}returnr.getResult().getOutput().getContent();}).filter(StringUtils::hasLength);}

这里的实现相对简单,主要是传入了一个函数。接下来,我们将深入分析 doGetFluxChatResponse 的代码实现,以便更好地理解其具体逻辑和运作方式:

private Flux<ChatResponse>doGetFluxChatResponse2(DefaultChatClientRequestSpec inputRequest){//此处省略重复代码var fluxChatResponse=this.chatModel.stream(prompt);//此处省略重复代码returnadvisedResponse;}

这里的代码逻辑与阻塞回答基本相同,唯一的不同之处在于它调用了 chatModel.stream(prompt) 方法。接下来,我们将深入探讨 chatModel.stream(prompt) 方法的具体实现和其背后的设计思路:

publicFlux<ChatResponse>stream(Prompt prompt){returnFlux.deferContextual(contextView->{//此处省略重复代码Flux<OpenAiApi.ChatCompletionChunk>completionChunks=this.openAiApi.chatCompletionStream(request,getAdditionalHttpHeaders(prompt));//此处省略重复代码Flux<ChatResponse>chatResponse=completionChunks.map(this::chunkToChatCompletion).switchMap(chatCompletion->Mono.just(chatCompletion).map(chatCompletion2->{//此处省略重复代码returnnew ChatResponse(generations,from(chatCompletion2,null));}
                }));//此处省略重复代码returnnew MessageAggregator().aggregate(flux,observationContext::setResponse);});}

同样的逻辑在这里就不再赘述,我们将重点关注其中的区别。在这一部分,我们使用了 chatCompletionStream,而且与之前不同的是,这里不再使用 retryTemplate,而是引入了 webClient,这是一个能够接收事件流的工具类。

publicFlux<ChatCompletionChunk>chatCompletionStream(ChatCompletionRequest chatRequest,MultiValueMap<String,String>additionalHttpHeader){

    Assert.notNull(chatRequest,"The request body can not be null.");Assert.isTrue(chatRequest.stream(),"Request must set the stream property to true.");AtomicBoolean isInsideTool=new AtomicBoolean(false);returnthis.webClient.post().uri(this.completionsPath).headers(headers->headers.addAll(additionalHttpHeader)).body(Mono.just(chatRequest),ChatCompletionRequest.class).retrieve().bodyToFlux(String.class)// cancels the flux stream after the "[DONE]" is received..takeUntil(SSE_DONE_PREDICATE)// filters out the "[DONE]" message..filter(SSE_DONE_PREDICATE.negate()).map(content->ModelOptionsUtils.jsonToObject(content,ChatCompletionChunk.class))//此处省略一堆代码

这段代码的主要目的是通过 webClient 向指定路径发起一个 POST 请求,同时设置合适的请求头和请求体。在获取响应数据时,使用了事件流的方式(通过 bodyToFlux 方法)来接收响应内容,并对数据进行过滤和转换,最终将其转化为 ChatCompletionChunk 对象。

尽管其余的业务逻辑与之前相似,但有一点显著的区别,即整个流程的返回类型以及与 OpenAI API 的调用方式都是非阻塞式的。

总结

在当今的数字时代,流式响应机制不仅提升了系统的性能,还在用户体验上扮演了关键角色。通过引入 Flux 类型,Spring WebFlux 的设计理念使得应用能够以非阻塞的方式处理并发请求,从而有效利用资源并减少响应延迟。

我们终于全面讲解了Spring AI的基本操作,包括阻塞式回答、流式回答以及记忆增强功能。这些内容为我们深入理解其工作机制奠定了基础。接下来,我们将继续深入探索源码,重点分析回调函数、实体类映射等重要功能。

这将帮助我们更好地理解Spring AI的内部运作原理,并为进一步的优化和定制化提供指导。

22    2024-10-14 13:30:20    Flux 类型 Spring