Browse Source

refactor(stream-mq): 重构消息队列配置和消费者实现

- 修改应用端口从9402到9412
- 移除RabbitMQ、RocketMQ和Kafka的stream配置
- 删除LogStreamConsumer和TestStreamConsumer消费者类
- 删除LogStreamProducer和TestStreamProducer生产者类
- 从控制器中移除RocketMQ和Kafka消息发送接口
- 更新Nacos配置导入方式
- 修改延迟消费者的消息类型从TestMessaging到String
- 在抖音工具类中更新正则表达式匹配房间号逻辑
- 添加获取抖音弹幕的测试方法
- 从抖音工具类中清理不必要的导入和代码
- 在模型模块的pom文件中添加必要的依赖声明
JX.Li 2 weeks ago
parent
commit
64d20a8e33

+ 0 - 0
nexo-example/nexo-model/config/sin.js → nexo-example/nexo-model/config/geta_b.js


+ 1 - 0
nexo-example/nexo-model/pom.xml

@@ -17,6 +17,7 @@
 
 
     <dependencies>
     <dependencies>
 
 
+
         <!-- SpringCloud Alibaba Nacos -->
         <!-- SpringCloud Alibaba Nacos -->
         <dependency>
         <dependency>
             <groupId>com.alibaba.cloud</groupId>
             <groupId>com.alibaba.cloud</groupId>

+ 1 - 1
nexo-example/nexo-model/src/main/java/com/nexo/model/douyin/utils/DouYinUtils.java

@@ -35,7 +35,7 @@ public class DouYinUtils {
         String property = System.getProperty("user.dir");
         String property = System.getProperty("user.dir");
         // 执行JS文件
         // 执行JS文件
         try {
         try {
-            engine.eval(new FileReader(property + "\\config\\sin.js"));
+            engine.eval(new FileReader(property + "\\config\\geta_b.js"));
         } catch (ScriptException e) {
         } catch (ScriptException e) {
             throw new RuntimeException(e);
             throw new RuntimeException(e);
         } catch (FileNotFoundException e) {
         } catch (FileNotFoundException e) {

+ 36 - 12
nexo-example/nexo-model/src/test/java/com/nexo/model/douyin/抖音测试工具类.java

@@ -9,20 +9,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest;
 
 
-import javax.script.Invocable;
-import javax.script.ScriptEngine;
-import javax.script.ScriptEngineManager;
-import javax.script.ScriptException;
-import java.io.FileReader;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.ibatis.ognl.DynamicSubscript.mid;
+import java.util.HashMap;
 
 
 @Slf4j
 @Slf4j
 @SpringBootTest
 @SpringBootTest
@@ -187,7 +178,7 @@ public class 抖音测试工具类 {
         HashMap<String, String> hashMap = new HashMap<>();
         HashMap<String, String> hashMap = new HashMap<>();
         hashMap.put("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 18_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Mobile/15E148 Safari/604.1 Edg/148.0.0.0");
         hashMap.put("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 18_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Mobile/15E148 Safari/604.1 Edg/148.0.0.0");
         String done = OkHttpClientUtils.doGet(url, null, hashMap);
         String done = OkHttpClientUtils.doGet(url, null, hashMap);
-        String roomId = DouYinUtils.getRegexString(done, "roomId=\"(.*?)\"", 1);
+        String roomId = DouYinUtils.getRegexString(done, "roomId[^\\d]*(\\d{15,})", 1);
         if (StringUtils.isEmpty(roomId)) {
         if (StringUtils.isEmpty(roomId)) {
             throw new RuntimeException("未找到房间号");
             throw new RuntimeException("未找到房间号");
         }
         }
@@ -227,4 +218,37 @@ public class 抖音测试工具类 {
 
 
     }
     }
 
 
+    @Test
+    public void 获取抖音弹幕() {
+        String liveId = "561090451047";
+        try {
+
+            // 设置工作目录
+            String workDir = "C:\\Users\\Administrator\\Downloads\\DouyinLiveWebFetcher-main";
+
+            // 构建命令
+            String command = "cmd /c start cmd /k \"python main.py --live_id=" + liveId + "\"";
+
+            System.out.println("执行命令: " + command);
+
+            // 使用 ProcessBuilder 执行命令
+            ProcessBuilder processBuilder = new ProcessBuilder("cmd", "/c", command);
+            processBuilder.directory(new java.io.File(workDir));
+            // 合并错误流和输出流
+            processBuilder.redirectErrorStream(true);
+
+            // 启动进程
+            Process process = processBuilder.start();
+            // 等待进程自然结束
+            int exitCode = process.waitFor();
+            System.out.println("进程退出码: " + exitCode);
+
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
 }

+ 0 - 10
nexo-example/nexo-stream-mq/pom.xml

@@ -34,16 +34,6 @@
             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
         </dependency>
         </dependency>
 
 
-        <dependency>
-            <groupId>com.alibaba.cloud</groupId>
-            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
-        </dependency>
-
         <dependency>
         <dependency>
             <groupId>com.nexo</groupId>
             <groupId>com.nexo</groupId>
             <artifactId>nexo-common-sentinel</artifactId>
             <artifactId>nexo-common-sentinel</artifactId>

+ 0 - 26
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/controller/TestMqController.java

@@ -2,8 +2,6 @@ package com.nexo.stream.controller;
 
 
 import com.nexo.common.core.domain.R;
 import com.nexo.common.core.domain.R;
 import com.nexo.stream.mq.producer.DelayProducer;
 import com.nexo.stream.mq.producer.DelayProducer;
-import com.nexo.stream.mq.producer.LogStreamProducer;
-import com.nexo.stream.mq.producer.TestStreamProducer;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -20,8 +18,6 @@ import org.springframework.web.bind.annotation.RestController;
 public class TestMqController {
 public class TestMqController {
 
 
     private final DelayProducer delayProducer;
     private final DelayProducer delayProducer;
-    private final TestStreamProducer testStreamProducer;
-    private final LogStreamProducer logStreamProducer;
 
 
     /**
     /**
      * 发送消息Rabbitmq
      * 发送消息Rabbitmq
@@ -35,26 +31,4 @@ public class TestMqController {
         return R.ok();
         return R.ok();
     }
     }
 
 
-    /**
-     * 发送消息Rocketmq
-     *
-     * @param msg 消息内容
-     */
-    @GetMapping("/sendRocketmq")
-    public R<Void> sendRocketmq(String msg) {
-        testStreamProducer.streamTestMsg(msg);
-        return R.ok();
-    }
-
-    /**
-     * 发送消息Kafka
-     *
-     * @param msg 消息内容
-     */
-    @GetMapping("/sendKafka")
-    public R<Void> sendKafka(String msg) {
-        logStreamProducer.streamLogMsg(msg);
-        return R.ok();
-    }
-
 }
 }

+ 4 - 3
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/mq/consumer/DelayConsumer.java

@@ -1,7 +1,6 @@
 package com.nexo.stream.mq.consumer;
 package com.nexo.stream.mq.consumer;
 
 
 
 
-import com.nexo.stream.mq.TestMessaging;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
@@ -13,10 +12,12 @@ import java.util.function.Consumer;
 public class DelayConsumer {
 public class DelayConsumer {
 
 
     @Bean
     @Bean
-    Consumer<TestMessaging> delay() {
+    Consumer<String> delay() {
         log.info("初始化订阅");
         log.info("初始化订阅");
         return obj -> {
         return obj -> {
-            log.info("消息接收成功:" + obj);
+            log.info("消息接收成功:" + obj.toString());
         };
         };
     }
     }
+
+
 }
 }

+ 0 - 22
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/mq/consumer/LogStreamConsumer.java

@@ -1,22 +0,0 @@
-package com.nexo.stream.mq.consumer;
-
-import com.nexo.stream.mq.TestMessaging;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.function.Consumer;
-
-@Slf4j
-@Component
-public class LogStreamConsumer {
-
-    @Bean
-    Consumer<TestMessaging> log() {
-        log.info("初始化订阅");
-        return msg -> {
-            log.info("通过stream消费到消息 => {}", msg.toString());
-        };
-    }
-
-}

+ 0 - 22
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/mq/consumer/TestStreamConsumer.java

@@ -1,22 +0,0 @@
-package com.nexo.stream.mq.consumer;
-
-import com.nexo.stream.mq.TestMessaging;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.function.Consumer;
-
-@Slf4j
-@Component
-public class TestStreamConsumer {
-
-    @Bean
-    Consumer<TestMessaging> demo() {
-        log.info("初始化订阅");
-        return msg -> {
-            log.info("通过stream消费到消息 => {}", msg.toString());
-        };
-    }
-
-}

+ 0 - 24
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/mq/producer/LogStreamProducer.java

@@ -1,24 +0,0 @@
-package com.nexo.stream.mq.producer;
-
-import com.nexo.stream.mq.TestMessaging;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-@Component
-public class LogStreamProducer {
-
-    @Autowired
-    private StreamBridge streamBridge;
-
-    public void streamLogMsg(String msg) {
-        // 构建消息对象
-        TestMessaging testMessaging = new TestMessaging()
-            .setMsgId(UUID.randomUUID().toString())
-            .setMsgText(msg);
-        streamBridge.send("log-out-0", MessageBuilder.withPayload(testMessaging).build());
-    }
-}

+ 0 - 24
nexo-example/nexo-stream-mq/src/main/java/com/nexo/stream/mq/producer/TestStreamProducer.java

@@ -1,24 +0,0 @@
-package com.nexo.stream.mq.producer;
-
-import com.nexo.stream.mq.TestMessaging;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-@Component
-public class TestStreamProducer {
-
-    @Autowired
-    private StreamBridge streamBridge;
-
-    public void streamTestMsg(String msg) {
-        // 构建消息对象
-        TestMessaging testMessaging = new TestMessaging()
-            .setMsgId(UUID.randomUUID().toString())
-            .setMsgText(msg);
-        streamBridge.send("demo-out-0", MessageBuilder.withPayload(testMessaging).build());
-    }
-}

+ 2 - 79
nexo-example/nexo-stream-mq/src/main/resources/application.yml

@@ -1,5 +1,5 @@
 server:
 server:
-  port: 9402
+  port: 9412
 
 
 # Spring
 # Spring
 spring:
 spring:
@@ -9,84 +9,6 @@ spring:
   profiles:
   profiles:
     # 环境配置
     # 环境配置
     active: @profiles.active@
     active: @profiles.active@
-  cloud:
-    stream:
-      function:
-        # 重点配置 与 binding 名与消费者对应
-        definition: delay;demo;log
-
---- # rabbitmq 配置
-spring:
-  rabbitmq:
-    host: localhost
-    port: 5672
-    username: root
-    password: root
-  cloud:
-    stream:
-      rabbit:
-        bindings:
-          delay-in-0:
-            consumer:
-              delayedExchange: true
-          delay-out-0:
-            producer:
-              delayedExchange: true
-      bindings:
-        delay-in-0:
-          destination: delay.exchange.cloud
-          content-type: application/json
-          group: delay-group
-          binder: rabbit
-        delay-out-0:
-          destination: delay.exchange.cloud
-          content-type: application/json
-          group: delay-group
-          binder: rabbit
-
---- # rocketmq 配置
-spring:
-  cloud:
-    stream:
-      rocketmq:
-        binder:
-          # rocketmq 地址
-          name-server: localhost:9876
-        bindings:
-          demo-out-0:
-            producer:
-              # 必须得写
-              group: default
-      bindings:
-        demo-out-0:
-          content-type: application/json
-          destination: stream-test-topic
-          group: test-group
-          binder: rocketmq
-        demo-in-0:
-          content-type: application/json
-          destination: stream-test-topic
-          group: test-group
-          binder: rocketmq
-
---- # kafka 配置
-spring:
-  cloud:
-    stream:
-      kafka:
-        binder:
-          brokers: localhost:9092
-      bindings:
-        log-out-0:
-          destination: stream-log-topic
-          contentType: application/json
-          group: log_group
-          binder: kafka
-        log-in-0:
-          destination: stream-log-topic
-          contentType: application/json
-          group: log_group
-          binder: kafka
 
 
 --- # nacos 配置
 --- # nacos 配置
 spring:
 spring:
@@ -105,3 +27,4 @@ spring:
   config:
   config:
     import:
     import:
       - optional:nacos:application-common.yml
       - optional:nacos:application-common.yml
+      - optional:nacos:${spring.application.name}.yml

+ 1 - 16
nexo-visual/nexo-seata-server/src/main/resources/application.yml

@@ -12,12 +12,6 @@ logging:
   config: classpath:logback-spring.xml
   config: classpath:logback-spring.xml
   file:
   file:
     path: ./logs/seata
     path: ./logs/seata
-#  extend:
-#    logstash-appender:
-#      destination: 127.0.0.1:4560
-#    kafka-appender:
-#      bootstrap-servers: 127.0.0.1:9092
-#      topic: logback_to_logstash
 
 
 console:
 console:
   user:
   user:
@@ -32,11 +26,6 @@ seata:
       server-addr: @nacos.server@
       server-addr: @nacos.server@
       group: @nacos.discovery.group@
       group: @nacos.discovery.group@
       namespace: ${spring.profiles.active}
       namespace: ${spring.profiles.active}
-      username:
-      password:
-      ##if use MSE Nacos with auth, mutex with username/password attribute
-      #access-key: ""
-      #secret-key: ""
       data-id: seata-server.properties
       data-id: seata-server.properties
   registry:
   registry:
     # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa
     # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa
@@ -47,11 +36,7 @@ seata:
       group: @nacos.discovery.group@
       group: @nacos.discovery.group@
       namespace: ${spring.profiles.active}
       namespace: ${spring.profiles.active}
       cluster: default
       cluster: default
-      username:
-      password:
-      ##if use MSE Nacos with auth, mutex with username/password attribute
-      #access-key: ""
-      #secret-key: ""
+
   security:
   security:
     secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
     secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
     tokenValidityInMilliseconds: 1800000
     tokenValidityInMilliseconds: 1800000