浏览代码

开发前后端sse通信功能提交

zsy 3 周之前
父节点
当前提交
aa6884fecf

+ 56 - 6
src/main/java/com/rf/AIquantum/dialogue/rest/DialogueController.java

@@ -1,6 +1,7 @@
 package com.rf.AIquantum.dialogue.rest;
 
 import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.IdUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.auth0.jwt.interfaces.DecodedJWT;
@@ -9,11 +10,10 @@ import com.rf.AIquantum.dialogue.dao.model.ChatHistoryEntity;
 import com.rf.AIquantum.dialogue.dao.model.DialogueEntity;
 import com.rf.AIquantum.dialogue.service.ChatHistoryService;
 import com.rf.AIquantum.dialogue.service.DialogueService;
+import com.rf.AIquantum.filter.JwtIgnore;
 import com.rf.AIquantum.user.dao.model.UserEntity;
 import com.rf.AIquantum.user.service.UserService;
-import com.rf.AIquantum.utils.Constant;
-import com.rf.AIquantum.utils.JWTUtil;
-import com.rf.AIquantum.utils.Result;
+import com.rf.AIquantum.utils.*;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.commons.io.FileUtils;
@@ -60,9 +60,12 @@ public class DialogueController extends BaseController {
     @Autowired
     private ChatHistoryService chatHistoryService;
 
+    @Autowired
+    private SseEmitterService sseEmitterService;
+
     @PostMapping("/saveChat")
     @ApiOperation(value = "保存对话",notes = "参数包括:phone:手机号, dialogueId:对话id(为空时是新建对话), content:消息内容,image:图片(可以为空)")
-    public Result saveChat(MultipartFile image, String phone, String dialogueId, String content) throws UnsupportedEncodingException {
+    public Result saveChat(MultipartFile image, String phone, String dialogueId, String content) throws UnsupportedEncodingException, InterruptedException {
         UserEntity user = this.userService.findUserByPhone(phone);
         if (user == null){
             return fail(null,"用户不存在");
@@ -111,7 +114,7 @@ public class DialogueController extends BaseController {
         chatHistoryEntity.setUpdateTime(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
         this.chatHistoryService.save(chatHistoryEntity);
         //调用模型相关操作
-        List<ChatHistoryEntity> chatHistoryEntities = this.chatHistoryService.findChatHistoryByDialogueIdAndStatus(dialogueId);
+        /*List<ChatHistoryEntity> chatHistoryEntities = this.chatHistoryService.findChatHistoryByDialogueIdAndStatus(dialogueId);
         JSONArray messages = new JSONArray();
         for (ChatHistoryEntity chatHistory : chatHistoryEntities) {
             JSONArray contents = new JSONArray();
@@ -134,6 +137,7 @@ public class DialogueController extends BaseController {
         }
         JSONObject jsonChat = new JSONObject();
         jsonChat.put("messages",messages);
+        jsonChat.put("stream",true);
 
         String url = Constant.INVOKE_IP_PROT + Constant.CHAT_PATH;
         String data = HttpClientChat(jsonChat,url);
@@ -141,7 +145,15 @@ public class DialogueController extends BaseController {
         if (jsonSystem == null || !jsonSystem.containsKey("response")) {
             return fail("", "模型服务内部错误");
         }
-        content = jsonSystem.getString("response");
+        content = jsonSystem.getString("response");*/
+        String sseContent = "";
+        content = "";
+        for (int i = 0; i < 10; i++) {
+            sseContent = "消息"+i;
+            content = content + sseContent;
+            sseEmitterService.sendMessage(dialogueId,sseContent);
+            Thread.sleep(1000);
+        }
         chatHistoryEntity = new ChatHistoryEntity();
         chatHistoryEntity.setDialogueId(dialogueId);
         chatHistoryEntity.setRole("system");
@@ -180,6 +192,44 @@ public class DialogueController extends BaseController {
         this.dialogueService.save(dialogueEntity);
         return success("修改成功");
     }
+    /*@GetMapping("/sendMsg")
+    @ApiOperation(value = "sse测试接口",notes = "参数包括:dialogueId:对话id")
+    @JwtIgnore
+    public Result sendMsg(@RequestParam String dialogueId) throws UnsupportedEncodingException {
+        List<ChatHistoryEntity> chatHistoryEntities = this.chatHistoryService.findChatHistoryByDialogueIdAndStatus(dialogueId);
+        JSONArray messages = new JSONArray();
+        for (ChatHistoryEntity chatHistory : chatHistoryEntities) {
+            JSONArray contents = new JSONArray();
+            JSONObject jsonText = new JSONObject();
+            jsonText.put("type","text");
+            jsonText.put("text",chatHistory.getContent());
+            if (chatHistory.getImage() != null && !chatHistory.getImage().equals("")) {
+                JSONObject jsonImage = new JSONObject();
+                jsonImage.put("type","image_url");
+                JSONObject jsonUrl = new JSONObject();
+                jsonUrl.put("url",chatHistory.getImage());
+                jsonImage.put("image_url",jsonUrl);
+                contents.add(jsonImage);
+            }
+            contents.add(jsonText);
+            JSONObject jsonRole = new JSONObject();
+            jsonRole.put("role",chatHistory.getRole());
+            jsonRole.put("content",contents);
+            messages.add(jsonRole);
+        }
+        JSONObject jsonChat = new JSONObject();
+        jsonChat.put("messages",messages);
+        jsonChat.put("stream",true);
+
+        String url = Constant.INVOKE_IP_PROT + Constant.CHAT_PATH;
+        String data = HttpClientChat(jsonChat,url);
+        JSONObject jsonSystem = JSONObject.parseObject(data);
+        if (jsonSystem == null || !jsonSystem.containsKey("response")) {
+            return fail("", "模型服务内部错误");
+        }
+        String content = jsonSystem.getString("response");
+        return success();
+    }*/
 
     public static String HttpClientChat(JSONObject jsonChat, String url) throws UnsupportedEncodingException {
         System.out.println("11:"+jsonChat);

+ 43 - 0
src/main/java/com/rf/AIquantum/dialogue/rest/SseController.java

@@ -0,0 +1,43 @@
+package com.rf.AIquantum.dialogue.rest;
+
+/**
+ * @Author:zzf
+ * @Date:2025/3/10:17:59
+ * @Description:
+ */
+import com.rf.AIquantum.filter.JwtIgnore;
+import com.rf.AIquantum.utils.SseEmitterService;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.concurrent.TimeUnit;
+
+@RestController
+@RequestMapping("/sse")
+@Api(tags = "sse相关接口")
+public class SseController {
+
+    private final SseEmitterService sseEmitterService;
+
+    public SseController(SseEmitterService sseEmitterService) {
+        this.sseEmitterService = sseEmitterService;
+    }
+
+    @GetMapping(value = "/foundSse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    @ApiOperation(value = "创建sse连接接口",notes = "参数包括:dialogueId:对话id")
+    @JwtIgnore
+    public SseEmitter foundSse(String dialogueId) {
+        SseEmitter emitter = new SseEmitter(0L); // 不设置超时时间
+        sseEmitterService.addEmitter(dialogueId, emitter);
+
+        emitter.onCompletion(() -> sseEmitterService.removeEmitter(dialogueId));
+        emitter.onTimeout(() -> sseEmitterService.removeEmitter(dialogueId));
+
+        return emitter;
+    }
+}

+ 1 - 1
src/main/java/com/rf/AIquantum/utils/DateUtil.java

@@ -1,4 +1,4 @@
-package com.rf.fileCrack.utils;
+package com.rf.AIquantum.utils;
 
 import java.text.DateFormat;
 import java.text.DecimalFormat;

+ 1 - 1
src/main/java/com/rf/AIquantum/utils/EmailUtil.java

@@ -1,4 +1,4 @@
-package com.rf.fileCrack.utils;
+package com.rf.AIquantum.utils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.net.smtp.SMTPClient;

+ 0 - 32
src/main/java/com/rf/AIquantum/utils/IPUtiles.java

@@ -1,32 +0,0 @@
-package com.rf.fileCrack.utils;
-
-import javax.servlet.http.HttpServletRequest;
-
-/**
- * @author lpf
- * @description:
- * @date 2021/12/2517:59
- */
-public class IPUtiles {
-    public static String getRealIp(HttpServletRequest request) {
-        String ip = request.getHeader("X-Real-IP");
-        if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
-            ip = request.getHeader("X-Forwarded-For");
-        }
-        if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
-            ip = request.getHeader("Proxy-Client-IP");
-        }
-        if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
-            ip = request.getHeader("WL-Proxy-Client-IP");
-        }
-        if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
-            ip = request.getRemoteAddr();
-        }
-        if (ip != null && ip.contains(",")) {
-            String[] ipArray = ip.split(",");
-            ip = ipArray[0];
-        }
-        return ip;
-    }
-
-}

+ 38 - 0
src/main/java/com/rf/AIquantum/utils/SseEmitterService.java

@@ -0,0 +1,38 @@
+package com.rf.AIquantum.utils;
+
+/**
+ * @Author:zzf
+ * @Date:2025/3/10:18:25
+ * @Description:
+ */
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class SseEmitterService {
+
+    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
+
+    public void addEmitter(String clientId, SseEmitter emitter) {
+        emitters.put(clientId, emitter);
+    }
+
+    public void removeEmitter(String clientId) {
+        emitters.remove(clientId);
+    }
+
+    public void sendMessage(String clientId, Object message) {
+        SseEmitter emitter = emitters.get(clientId);
+        if (emitter != null) {
+            try {
+                emitter.send(message);
+            } catch (Exception e) {
+                // 处理异常,如网络问题等
+                emitters.remove(clientId);
+            }
+        }
+    }
+}