RoomOperationServiceImpl.java 14.7 KB
package com.wondertek.service.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.wondertek.dto.BlockParam;
import com.wondertek.dto.DelayParam;
import com.wondertek.dto.MarkParam;
import com.wondertek.entity.LiveMonitorRoom;
import com.wondertek.entity.MonitorMark;
import com.wondertek.entity.OperationLog;
import com.wondertek.entity.StreamTask;
import com.wondertek.enums.GlobalCodeEnum;
import com.wondertek.enums.RoomOperationEnum;
import com.wondertek.exception.BusinessException;
import com.wondertek.exception.ServiceException;
import com.wondertek.mapper.LiveMonitorRoomMapper;
import com.wondertek.mapper.MonitorMarkMapper;
import com.wondertek.mapper.StreamTaskMapper;
import com.wondertek.service.OperationLogService;
import com.wondertek.service.RoomOperationSerivice;
import com.wondertek.util.JSONUtils;
import com.wondertek.util.ResponseData;
import com.wondertek.util.ResultBean;
import com.wondertek.vo.RoomBlock;
import com.wondertek.vo.RoomMark;
import com.wondertek.vo.RoomOperationVo;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Description:
 * @Author W5669
 * @Create 2025/7/22
 * @Version 1.0
 */
@Service
@Slf4j
public class RoomOperationServiceImpl implements RoomOperationSerivice {
    @Resource
    private LiveMonitorRoomMapper liveMonitorRoomMapper;
    @Resource
    private StreamTaskMapper streamTaskMapper;
    @Value("${transcode.delayTimeUrl}")
    private String delayTimeUrl;
    @Value("${transcode.blockStatusUrl}")
    private String blockStatusUrl;
    @Resource
    private MonitorMarkMapper monitorMarkMapper;
    @Resource
    private OperationLogService operationLogService;

    @Override
    public ResultBean setDelayTime(DelayParam delayParam) {
        LiveMonitorRoom liveMonitorRoom = liveMonitorRoomMapper.selectById(delayParam.getRoomId());
        //日志
        OperationLog opLog = new OperationLog();
        opLog.setRoomName(liveMonitorRoom.getName());
        opLog.setBusinessId(delayParam.getRoomId().toString());
        opLog.setBusinessType("cloudMonitor");
        opLog.setOperationType(RoomOperationEnum.DELAY.getCode());
        opLog.setCreatedBy("admin");
        opLog.setCreatedTime(LocalDateTime.now());
        opLog.setRequestUrl(delayTimeUrl);

        LambdaQueryWrapper<StreamTask> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(StreamTask::getRoomId,delayParam.getRoomId());
        List<StreamTask> streamTasks = streamTaskMapper.selectList(wrapper);
        //1.依次取出延时1、延时2、播出任务,设置转码平台相应的延时时间
        for (StreamTask streamTask : streamTasks){
            if (streamTask.getPlayType().equals("delay1")){
                ResponseData responseData = delayTask(streamTask.getTaskId(), delayParam.getDelayFirst());
                if (!responseData.isSuccess()){
                    opLog.setStatus("1");
                    opLog.setReponseResult(JSONUtils.obj2json(responseData));
                    return ResultBean.error();
                }
                //更新任务
                streamTask.setDelayTime(delayParam.getDelayFirst());
                streamTaskMapper.updateById(streamTask);
            }
            if (streamTask.getPlayType().equals("delay2")){
                ResponseData responseData = delayTask(streamTask.getTaskId(),delayParam.getDelaySecond());
                if (!responseData.isSuccess()){
                    opLog.setStatus("1");
                    opLog.setReponseResult(JSONUtils.obj2json(responseData));
                    return ResultBean.error();
                }
                //更新任务
                streamTask.setDelayTime(delayParam.getDelaySecond());
                streamTaskMapper.updateById(streamTask);
            }
            if (streamTask.getPlayType().equals("play")){
                ResponseData responseData = delayTask(streamTask.getTaskId(),delayParam.getDelayPlay());
                if (!responseData.isSuccess()){
                    opLog.setStatus("1");
                    opLog.setReponseResult(JSONUtils.obj2json(responseData));
                    return ResultBean.error();
                }
                //更新任务
                streamTask.setDelayTime(delayParam.getDelayPlay());
                streamTaskMapper.updateById(streamTask);
            }
        }
        opLog.setStatus("0");
        opLog.setReponseResult(JSONUtils.obj2json(ResponseData.ok()));
        operationLogService.save(opLog);
        return ResultBean.ok("设置成功");
    }

    public ResponseData delayTask(String taskId, int delayTime){
        Map<String, Object> paramsMap = new HashMap<>();
        paramsMap.put("taskId", taskId);
        paramsMap.put("delayTime", delayTime);
        HttpResponse execute = null;
        try {
            execute = HttpRequest.put(delayTimeUrl)
                    .header("Content-Type", "application/json")
                    .body(JSONUtils.obj2json(paramsMap)).timeout(30000).execute();
            if (execute.isOk()) {
                log.info("-->请求转码系统延时接口,url:{},paramsMap:{}",delayTimeUrl,paramsMap);
                String body = execute.body();
                ResponseData response = JSONUtils.jsonToObject(body, new TypeReference<ResponseData>() {
                });
                if (response.getResultCode().equals("0")) {
                    log.info("请求转码系统延时接口成功");
                }else {
                    log.info("请求转码系统延时接口响应失败,response:{}",response);
                }
                return response;
            } else {
                log.error("请求转码系统延时接口失败,url:{},body:{}", delayTimeUrl, execute.body());
                return ResponseData.error("请求转码系统延时接口失败");
            }
        } catch (Exception e) {
            log.error("请求转码系统延时接口失败,url:{}", delayTimeUrl, e);
            return ResponseData.error("请求转码系统延时接口失败");
        }
    }

    @Override
    public ResultBean blockStatus(BlockParam blockParam) {
        Long start = System.currentTimeMillis();

        LiveMonitorRoom liveMonitorRoom = liveMonitorRoomMapper.selectById(blockParam.getRoomId());
        if (ObjectUtil.isNull(liveMonitorRoom)){
            return ResultBean.error("审片间不存在");
        }
        //校验当前屏蔽、恢复状态
        if (blockParam.getBlockStatus().toString().equals(liveMonitorRoom.getBrocStatus())){
            String message = blockParam.getBlockStatus().equals(0)?"恢复":"屏蔽";
            throw new ServiceException("当前审片间已处于"+ message +"状态");
        }

        //获取当前播出流
        LambdaQueryWrapper<StreamTask> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(StreamTask::getRoomId,blockParam.getRoomId());
        wrapper.eq(StreamTask::getPlayType,"play");
        List<StreamTask> streamTasks = streamTaskMapper.selectList(wrapper);
        if (CollectionUtils.isEmpty(streamTasks)){
            return ResultBean.error("当前审片间没有播放任务");
        }

        //日志
        OperationLog opLog = new OperationLog();
        opLog.setRoomName(liveMonitorRoom.getName());
        opLog.setBusinessId(blockParam.getRoomId().toString());
        opLog.setBusinessType("cloudMonitor");
        opLog.setOperationType(blockParam.getBlockStatus().equals(0)?
                RoomOperationEnum.BLOCK.getCode():RoomOperationEnum.RECOVER.getCode());
        opLog.setCreatedBy("admin");
        opLog.setCreatedTime(LocalDateTime.now());
        opLog.setRequestUrl(blockStatusUrl);

        log.info("-->审片间屏蔽前置逻辑耗时:{}ms",System.currentTimeMillis()-start);
        streamTasks.forEach(streamTask -> {
            //调用屏蔽接口
            Map<String, Object> paramsMap = new HashMap<>();
            paramsMap.put("taskId", streamTask.getTaskId());
            paramsMap.put("blockStatus", blockParam.getBlockStatus());
            HttpResponse execute = null;
            try {
                long tag1 = System.currentTimeMillis();
                execute = HttpRequest.put(blockStatusUrl)
                        .header("Content-Type", "application/json")
                        .body(JSONUtils.obj2json(paramsMap)).timeout(30000).execute();
                if (execute.isOk()) {
                    log.info("-->请求转码系统屏蔽/恢复接口,url:{},paramsMap:{}",blockStatusUrl,paramsMap);
                    log.info("请求转码系统屏蔽/恢复接口,请求耗时:{}ms",System.currentTimeMillis()-tag1);
                    String body = execute.body();
                    ResponseData response = JSONUtils.jsonToObject(body, new TypeReference<ResponseData>() {
                    });
                    if (response.getResultCode().equals("0")) {
                        log.info("请求转码系统屏蔽/恢复接口成功");
                        if(streamTask.getTaskType().equals("0")){
                            opLog.setStatus(GlobalCodeEnum.SUCCESS.getCode());
                            opLog.setReponseResult(JSONUtils.obj2json(response));
                        }
                    }else {
                        log.info("请求转码系统屏蔽/恢复接口响应失败,response:{}",response);
                        if(streamTask.getTaskType().equals("0")){
                            opLog.setStatus(GlobalCodeEnum.FAILURE.getCode());
                            opLog.setReponseResult(JSONUtils.obj2json(response));
                        }
                        throw new BusinessException("block_status_error", "屏蔽状态错误");
                    }
                } else {
                    log.error("请求转码系统屏蔽/恢复接口失败,url:{},body:{}", blockStatusUrl, execute.body());
                    opLog.setStatus(GlobalCodeEnum.FAILURE.getCode());
                    throw new BusinessException("block_status_error", "屏蔽状态错误");
                }
            } catch (Exception e) {
                log.error("请求转码系统屏蔽/恢复接口失败,url:{}", blockStatusUrl, e);
                opLog.setStatus(GlobalCodeEnum.FAILURE.getCode());
                throw new BusinessException("block_status_error", "屏蔽状态错误");
            }
        });
        //保存操作日志
        operationLogService.save(opLog);
        //保存播控操作记录
        MonitorMark monitorMark = new MonitorMark();
        monitorMark.setRoomId(blockParam.getRoomId());
        monitorMark.setCreatedTime(LocalDateTime.now());
        monitorMark.setCreatedBy("admin");
        monitorMark.setOperationType(blockParam.getBlockStatus()==0 ? RoomOperationEnum.RECOVER.getCode() : RoomOperationEnum.BLOCK.getCode());
        monitorMark.setOperationTime(LocalDateTime.now());
        monitorMarkMapper.insert(monitorMark);

        liveMonitorRoom.setBrocStatus(String.valueOf(blockParam.getBlockStatus()));
        liveMonitorRoomMapper.updateById(liveMonitorRoom);

        return ResultBean.ok("设置成功");
    }

    @Override
    public ResultBean addMark(MarkParam markParam) {
        //校验是否重复操作
        LambdaQueryWrapper<MonitorMark> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(MonitorMark::getRoomId, markParam.getRoomId())
                .in(MonitorMark::getOperationType, RoomOperationEnum.MARK_START.getCode(), RoomOperationEnum.MARK_EDN.getCode())
                .orderByDesc(MonitorMark::getCreatedTime).last("limit 1");
        MonitorMark lastMark = monitorMarkMapper.selectOne(wrapper);
        if(lastMark != null && lastMark.getOperationType().equals(markParam.getOperationType())){
            return ResultBean.repeat(lastMark);
        }

        MonitorMark monitorMark = new MonitorMark();
        monitorMark.setRoomId(markParam.getRoomId());
        monitorMark.setOperationType(markParam.getOperationType());
        monitorMark.setCreatedTime(LocalDateTime.now());
        monitorMark.setOperationTime(markParam.getMarkTime());
        monitorMark.setCreatedBy("admin");
        monitorMarkMapper.insert(monitorMark);
        //日志
        OperationLog opLog = new OperationLog();
        opLog.setRoomName(markParam.getRoomName());
        opLog.setBusinessId(markParam.getRoomId().toString());
        opLog.setBusinessType("cloudMonitor");
        opLog.setOperationType(markParam.getOperationType());
        opLog.setCreatedBy("admin");
        opLog.setCreatedTime(LocalDateTime.now());
        opLog.setRequestUrl(delayTimeUrl);
        opLog.setStatus("0");
        operationLogService.save(opLog);


        return ResultBean.ok(monitorMark);
    }

    @Override
    public ResultBean operationList(Long roomId) {
        LambdaQueryWrapper<MonitorMark> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(MonitorMark::getRoomId, roomId);
        List<MonitorMark> monitorMarks = monitorMarkMapper.selectList(wrapper);
        RoomOperationVo roomOperationVo = new RoomOperationVo();
        ArrayList<RoomMark> markList = new ArrayList<>();
        ArrayList<RoomBlock> blockList = new ArrayList<>();
        if(CollectionUtils.isNotEmpty(monitorMarks)){
            //根据operationType处理数据
            monitorMarks.forEach(monitorMark -> {
                if (monitorMark.getOperationType().equals(RoomOperationEnum.MARK_START.getCode())
                        ||monitorMark.getOperationType().equals(RoomOperationEnum.MARK_EDN.getCode())){
                    RoomMark roomMark = new RoomMark();
                    roomMark.setId(monitorMark.getId());
                    roomMark.setOperationTime(monitorMark.getOperationTime());
                    roomMark.setOperationType(monitorMark.getOperationType());
                    markList.add(roomMark);
                }else {
                    RoomBlock roomBlock = new RoomBlock();
                    roomBlock.setId(monitorMark.getId());
                    roomBlock.setOperationTime(monitorMark.getOperationTime());
                    roomBlock.setOperationType(monitorMark.getOperationType());
                    blockList.add(roomBlock);
                }
            });
            roomOperationVo.setMarkList(markList);
            roomOperationVo.setBlockList(blockList);
        }
        roomOperationVo.setMarkList(markList);
        roomOperationVo.setBlockList(blockList);
        return ResultBean.ok(roomOperationVo);
    }
}