经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring » 查看文章
智能工作流:Spring AI高效批量化提示访问方案
来源:cnblogs  作者:linkcxt  时间:2024/5/13 10:21:10  对本文有异议

基于SpringAI搭建系统,依靠线程池\负载均衡等技术进行请求优化,用于解决科研&开发过程中对GPT接口进行批量化接口请求中出现的问题。

github地址:https://github.com/linkcao/springai-wave

大语言模型接口以OpenAI的GPT 3.5为例,JDK版本为17,其他依赖版本可见仓库pom.xml

拟解决的问题

在处理大量提示文本时,存在以下挑战:

  1. API密钥请求限制: 大部分AI服务提供商对API密钥的请求次数有限制,单个密钥每分钟只能发送有限数量的请求。
  2. 处理速度慢: 大量的提示文本需要逐条发送请求,处理速度较慢,影响效率。
  3. 结果保存和分析困难: 处理完成的结果需要保存到本地数据库中,并进行后续的数据分析,但这一过程相对复杂。

解决方案

为了解决上述问题,本文提出了一种基于Spring框架的批量化提示访问方案,如下图所示:

image-20240511160521257

其中具体包括以下步骤:

  1. 多线程处理提示文本: 将每个提示文本看作一个独立的任务,采用线程池的方式进行多线程处理,提高处理效率。
  2. 动态分配API密钥: 在线程池初始化时,通过读取本地数据库中存储的API密钥信息,动态分配每个线程单元所携带的密钥,实现负载均衡。
  3. 结果保存和管理: 在请求完成后,将每个请求的问题和回答保存到本地数据库中,以便后续的数据分析和管理。
  4. 状态实时更新: 将整个批量请求任务区分为进行中、失败和完成状态,并通过数据库保存状态码实时更新任务状态,方便监控和管理。

关键代码示例

  1. 多线程异步请求提示信息(所在包: ChatService)
  1. // 线程池初始化
  2. private static final ExecutorService executor = Executors.newFixedThreadPool(10);
  3. /**
  4. * 多线程请求提示
  5. * @param prompts
  6. * @param user
  7. * @param task
  8. * @return
  9. */
  10. @Async
  11. public CompletableFuture<Void> processPrompts(List<String> prompts Users user Task task) {
  12. for (int i = 0; i < prompts.size();i++) {
  13. int finalI = i;
  14. // 提交任务
  15. executor.submit(() -> processPrompt(prompts.get(finalI), user finalI));
  16. }
  17. // 设置批量任务状态
  18. task.setStatus(TaskStatus.COMPLETED);
  19. taskService.setTask(task);
  20. return CompletableFuture.completedFuture(null);
  21. }
  • 如上所示,利用了Spring框架的@Async注解和线程池的功能,实现了多线程异步处理提示信息。

  • 首先,使用了ExecutorService创建了一个固定大小的线程池,以便同时处理多个提示文本。

  • 然后,通过CompletableFuture来实现异步任务的管理。

  • 在处理每个提示文本时,通过executor.submit()方法提交一个任务给线程池,让线程池来处理。

  • 处理完成后,将批量任务的状态设置为已完成,并更新任务状态。

  • 一个线程任务需要绑定请求的用户以及所在的批量任务,当前任务所分配的key由任务所在队列的下标决定。

  1. 处理单条提示信息(所在包: ChatService)
  1. /**
  2. * 处理单条提示文本
  3. * @param prompt 提示文本
  4. * @param user 用户
  5. * @param index 所在队列下标
  6. */
  7. public void processPrompt(String prompt Users user int index) {
  8. // 获取Api Key
  9. OpenAiApi openAiApi = getApiByIndex(user index);
  10. assert openAiApi != null;
  11. ChatClient client = new OpenAiChatClient(openAiApi);
  12. // 提示文本请求
  13. String response = client.call(prompt);
  14. // 日志记录
  15. log.info("提示信息" + prompt );
  16. log.info("输出" + response );
  17. // 回答保存数据库
  18. saveQuestionAndAnswer(user prompt response);
  19. }
  • 首先根据任务队列的下标获取对应的API密钥
  • 然后利用该密钥创建一个与AI服务进行通信的客户端。
  • 接着,使用客户端发送提示文本请求,并获取AI模型的回答。
  • 最后,将问题和回答保存到本地数据库和日志中,以便后续的数据分析和管理。
  1. Api Key 负载均衡(所在包: ChatService)
  1. /**
  2. * 采用任务下标分配key的方式进行负载均衡
  3. * @param index 任务下标
  4. * @return OpenAiApi
  5. */
  6. private OpenAiApi getApiByIndex(int index){
  7. List<KeyInfo> keyInfoList = keyRepository.findAll();
  8. if (keyInfoList.isEmpty()) {
  9. return null;
  10. }
  11. // 根据任务队列下标分配 Key
  12. KeyInfo keyInfo = keyInfoList.get(index % keyInfoList.size());
  13. return new OpenAiApi(keyInfo.getApi(),keyInfo.getKeyValue());
  14. }
  • 首先从本地数据库中获取所有可用的API密钥信息
  • 然后根据任务队列的下标来动态分配API密钥。
  • 确保每个线程单元都携带了不同的API密钥,避免了因为某个密钥请求次数达到限制而导致的请求失败问题。
  1. 依靠线程池批量请求GPT整体方法(所在包: ChatController)
  1. /**
  2. * 依靠线程池批量请求GPT
  3. * @param promptFile 传入的批量提示文件,每一行为一个提示语句
  4. * @param username 调用的用户
  5. * @return 处理状态
  6. */
  7. @PostMapping("/batch")
  8. public String batchPrompt(MultipartFile promptFile String username){
  9. if (promptFile.isEmpty()) {
  10. return "上传的文件为空";
  11. }
  12. // 批量请求任务
  13. Task task = new Task();
  14. try {
  15. BufferedReader reader = new BufferedReader(new InputStreamReader(promptFile.getInputStream()));
  16. List<String> prompts = new ArrayList<>();
  17. String line;
  18. while ((line = reader.readLine()) != null) {
  19. prompts.add(line);
  20. }
  21. // 用户信息请求
  22. Users user = userService.findByUsername(username);
  23. // 任务状态设置
  24. task.setFileName(promptFile.getName());
  25. task.setStartTime(LocalDateTime.now());
  26. task.setUserId(user.getUserId());
  27. task.setStatus(TaskStatus.PROCESSING);
  28. // 线程池处理
  29. chatService.processPrompts(prompts user task);
  30. return "文件上传成功,已开始批量处理提示";
  31. } catch ( IOException e) {
  32. // 处理失败
  33. e.printStackTrace();
  34. task.setStatus(TaskStatus.FAILED);
  35. return "上传文件时出错:" + e.getMessage();
  36. } finally {
  37. // 任务状态保存
  38. taskService.setTask(task);
  39. }
  40. }
  • 首先,接收用户上传的批量提示文件和用户名信息。
  • 然后,读取文件中的每一行提示文本,并将它们存储在一个列表中。
  • 接着,根据用户名信息找到对应的用户,并创建一个任务对象来跟踪批量处理的状态。
  • 最后,调用ChatService中的processPrompts()方法来处理提示文本,并返回处理状态给用户。

数据库ER图

所有信息都与用户ID强绑定,便于管理和查询,ER图如下所示:

image-20240511165330676

演示示例

  1. 通过postman携带批量请求文件username信息进行Post请求访问localhost:8080/batch接口:

image-20240511165636797

  1. 在实际应用中,可以根据具体需求对提示文本进行定制和扩展,以满足不同场景下的需求,演示所携带的请求文件内容如下:
  1. 请回答1+2=?
  2. 请回答8*12=?
  3. 请回答12*9=?
  4. 请回答321-12=?
  5. 请回答12/4=?
  6. 请回答32%2=?
  1. 最终返回的数据库结果,左为问题库,右为回答库:

image-20240511165910247

  • 问题库和答案库通过question_iduser_id进行绑定,由于一个问题可以让GPT回答多次,因此两者的关系为多对一,将问题和答案分在两个独立的表中也便于后续的垂域定制和扩展。

原文链接:https://www.cnblogs.com/linkcxt/p/18187163

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号