SRS 4.0 HTTP回调实战:Spring Boot 2.3.7 实现7种事件鉴权与日志记录
流媒体服务在现代互联网应用中扮演着越来越重要的角色,而SRS(Simple RTMP Server)作为一款开源的流媒体服务器,因其高性能和易扩展性受到广泛关注。本文将深入探讨SRS 4.0的HTTP回调机制在Spring Boot中的完整实现方案,涵盖7种核心回调事件的接口开发、统一鉴权逻辑、日志记录方案以及高可用配置。
1. SRS HTTP回调机制概述
HTTP回调是SRS提供的一种灵活的事件通知机制,它允许开发者在特定事件发生时(如客户端连接、发布流、播放流等),通过HTTP请求将事件数据推送到自定义的业务服务器。这种机制为流媒体业务提供了强大的扩展能力,使得我们可以在不修改SRS核心代码的情况下,实现复杂的业务逻辑。
SRS 4.0支持的主要回调事件包括:
- on_connect:客户端连接到SRS服务器时触发
- on_publish:客户端发布流时触发
- on_play:客户端播放流时触发
- on_stop:客户端停止播放时触发
- on_unpublish:客户端停止发布流时触发
- on_close:客户端断开连接时触发
- on_hls:生成HLS切片文件时触发
每种回调事件都会携带特定的JSON格式数据,业务服务器需要处理这些数据并返回规定的响应格式。典型的回调数据格式如下:
{ "action": "on_publish", "client_id": 1985, "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "stream": "livestream", "param": "?token=xxx&salt=yyy" }2. Spring Boot项目结构与配置
2.1 项目初始化与依赖配置
我们使用Spring Boot 2.3.7作为后端框架,首先创建项目并配置必要的依赖。pom.xml关键配置如下:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>com.auth0</groupId> <artifactId>java-jwt</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies>2.2 安全配置与HTTPS支持
由于回调接口需要暴露给SRS服务器调用,我们建议启用HTTPS保证通信安全。在application.properties中配置:
server.port=400 server.ssl.key-store=tomcat.keystore server.ssl.key-store-password=123456 server.ssl.keyStoreType=JKS server.ssl.keyAlias=tomcat同时配置Web安全规则,允许跨域访问回调接口:
@Configuration public class WebConfig implements WebMvcConfigurer { @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("/**") .addResourceLocations("classpath:/"); } @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/**") .allowedOrigins("*") .allowedMethods("*"); } }3. 回调接口设计与实现
3.1 统一回调数据模型
为方便处理不同类型的回调事件,我们首先定义统一的回调数据模型:
@Data public class CallbackData { private String action; private String client_id; private String ip; private String vhost; private String app; private String stream; private String param; private String tcUrl; private String pageUrl; private String server_id; }3.2 回调控制器实现
创建统一的回调控制器,处理所有类型的回调事件:
@RestController @RequestMapping("/api/callback") public class CallbackController { private static final Logger logger = LoggerFactory.getLogger(CallbackController.class); @PostMapping("/on_connect") public int handleConnect(@RequestBody CallbackData data) { logger.info("Connect event: {}", data); // 鉴权逻辑 return 0; // 0表示成功,非0表示失败 } @PostMapping("/on_publish") public int handlePublish(@RequestBody CallbackData data) { logger.info("Publish event: {}", data); // 发布流鉴权 return validateStream(data) ? 0 : 1; } @PostMapping("/on_play") public int handlePlay(@RequestBody CallbackData data) { logger.info("Play event: {}", data); // 播放流鉴权 return validatePlay(data) ? 0 : 1; } // 其他回调方法... private boolean validateStream(CallbackData data) { // 实现具体的流发布鉴权逻辑 return true; } private boolean validatePlay(CallbackData data) { // 实现具体的流播放鉴权逻辑 return true; } }3.3 统一鉴权机制
我们采用JWT(JSON Web Token)作为鉴权机制,实现统一的Token验证工具类:
public class TokenUtils { private static final long EXPIRE_TIME = 60 * 60 * 1000; // 1小时 private static final String SECRET = "your-secret-key"; public static boolean verify(String token, String username) { try { Algorithm algorithm = Algorithm.HMAC256(SECRET); JWTVerifier verifier = JWT.require(algorithm) .withClaim("username", username) .build(); verifier.verify(token); return true; } catch (Exception e) { return false; } } public static String getUsername(String token) { try { DecodedJWT jwt = JWT.decode(token); return jwt.getClaim("username").asString(); } catch (JWTDecodeException e) { return null; } } public static String sign(String username) { Date date = new Date(System.currentTimeMillis() + EXPIRE_TIME); Algorithm algorithm = Algorithm.HMAC256(SECRET); return JWT.create() .withClaim("username", username) .withExpiresAt(date) .sign(algorithm); } }4. SRS服务器配置
4.1 回调配置示例
在SRS的配置文件srs.conf中,我们需要为每种回调事件配置对应的HTTP接口地址:
vhost __defaultVhost__ { http_hooks { enabled on; on_connect http://your-domain:400/api/callback/on_connect; on_close http://your-domain:400/api/callback/on_close; on_publish http://your-domain:400/api/callback/on_publish; on_unpublish http://your-domain:400/api/callback/on_unpublish; on_play http://your-domain:400/api/callback/on_play; on_stop http://your-domain:400/api/callback/on_stop; on_hls http://your-domain:400/api/callback/on_hls; } }4.2 高可用配置建议
为确保回调服务的高可用性,我们建议:
- 多实例部署:部署多个Spring Boot实例,使用Nginx进行负载均衡
- 心跳检测:实现健康检查接口,确保服务可用
- 重试机制:在SRS配置中设置多个回调地址,以空格分隔
- 超时设置:合理设置HTTP请求超时时间,避免阻塞SRS主线程
5. 日志记录与监控
5.1 日志记录方案
我们使用Spring Boot默认的Logback日志框架,配置详细的回调事件日志:
<!-- logback-spring.xml --> <configuration> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>logs/srs-callback.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>logs/srs-callback.%d{yyyy-MM-dd}.log</fileNamePattern> <maxHistory>30</maxHistory> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="FILE" /> </root> </configuration>5.2 监控指标
通过Spring Boot Actuator暴露监控端点,实时掌握回调服务状态:
# application.properties management.endpoints.web.exposure.include=health,metrics,prometheus management.metrics.tags.application=srs-callback-service6. 实战案例:流发布鉴权
让我们以on_publish回调为例,详细实现一个完整的流发布鉴权流程:
@PostMapping("/on_publish") public int handlePublish(@RequestBody CallbackData data) { logger.info("Publish request received: {}", data); // 1. 基础参数校验 if (StringUtils.isEmpty(data.getApp()) || !"live".equals(data.getApp())) { logger.warn("Invalid app name: {}", data.getApp()); return 1; } // 2. Token验证 if (StringUtils.isEmpty(data.getParam()) || !data.getParam().startsWith("?token=")) { logger.warn("Missing token parameter"); return 1; } String token = data.getParam().substring(7); // 去掉"?token="前缀 String username = TokenUtils.getUsername(token); if (username == null || !TokenUtils.verify(token, username)) { logger.warn("Invalid token for user: {}", username); return 1; } // 3. 业务规则校验(例如:用户是否有发布权限) if (!userService.hasPublishPermission(username, data.getStream())) { logger.warn("User {} has no permission to publish stream {}", username, data.getStream()); return 1; } logger.info("Publish authorized for user {} on stream {}", username, data.getStream()); return 0; }7. 性能优化与最佳实践
在实际生产环境中,我们需要考虑以下优化措施:
- 异步处理:对于非关键路径的逻辑,使用
@Async异步处理 - 缓存机制:缓存频繁验证的Token和权限信息
- 批量操作:对于HLS切片回调等高频事件,考虑批量处理
- 连接池优化:调整HTTP客户端连接池参数
@Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(100); executor.setThreadNamePrefix("SrsCallbackAsync-"); executor.initialize(); return executor; } }通过本文的实践,我们构建了一个完整的SRS HTTP回调处理系统,实现了7种核心事件的鉴权与日志记录功能。这种架构不仅提高了流媒体服务的安全性,也为后续的业务扩展提供了坚实的基础。