diff --git a/example/default/config.yaml b/example/default/config.yaml index 2f858d1f..f48de3d5 100755 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,11 +1,25 @@ global: + http: + listenaddr: :8080 location: "^/hdl/(.*)": "/flv/$1" # 兼容 v4 "^/stress/api/(.*)": "/test/api/stress/$1" # 5.0.x "^/monitor/(.*)": "/debug/$1" # 5.0.x - loglevel: debug + loglevel: info admin: enablelogin: false + # S3存储配置示例(需要时取消注释并填写实际值) + # storage: + # s3: + # endpoint: "s3.amazonaws.com" # S3服务端点 + # region: "us-east-1" # AWS区域 + # accessKeyID: "your-access-key-id" # 访问密钥ID + # secretAccessKey: "your-secret-access-key" # 秘密访问密钥 + # bucket: "your-bucket-name" # 存储桶名称 + # pathPrefix: "monibuca/recordings" # 文件路径前缀 + # forcePathStyle: false # 强制路径样式(MinIO需要设为true) + # useSSL: true # 是否使用SSL + # timeout: "30s" # 上传超时时间 # pullproxy: # - id: 1 # 唯一ID标识,必须大于0 # name: "camera-1" # 拉流代理名称 @@ -57,23 +71,23 @@ gb28181: - platformservergbid: "34020000002000000002" #上级平台GBID channeldbid: "34020000001110000003_34020000001320000005" #通道DBID,格式为设备ID_通道ID mp4: - # enable: false + enable: true # publish: # delayclosetimeout: 3s # onpub: # record: - # ^live/.+: - # fragment: 10s - # filepath: record/$0 - # storage: - # s3: - # endpoint: "storage-dev.xiding.tech" - # accessKeyId: "xidinguser" - # secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c=" - # bucket: "vidu-media-bucket" - # pathPrefix: "" - # forcePathStyle: true - # useSSL: true + # ^live/(.+)$: + # filepath: s3rec/$0 + # filename: $0.mp4 + # storage: + # s3: + # endpoint: "storage-dev.xiding.tech" + # accessKeyId: "xidinguser" + # secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c=" + # bucket: "vidu-media-bucket" + # pathPrefix: "recordings" + # forcePathStyle: true + # useSSL: true # pull: # live/test: /Users/dexter/Movies/1744963190.mp4 onsub: @@ -90,8 +104,16 @@ flv: # onpub: # record: # ^live/.+: - # fragment: 1m # filepath: record/$0 + # storage: + # s3: + # endpoint: "s3.amazonaws.com" + # accessKeyId: "your-access-key-id" + # secretAccessKey: "your-secret-access-key" + # bucket: "your-bucket-name" + # pathPrefix: "flv-recordings" + # forcePathStyle: false + # useSSL: true publish: delayclosetimeout: 3s onsub: diff --git a/pkg/config/types.go b/pkg/config/types.go index fbf5b8f9..8901a130 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -123,8 +123,9 @@ type ( } Record struct { Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式,event=事件录像模式;default:'auto'"` - Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7 - FilePath string `desc:"录制文件路径"` // 录制文件路径 + Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7 + FilePath string `desc:"录制文件路径"` // 录制文件路径 + FileName string `json:"fileName" desc:"录制文件名" gorm:"-"` Fragment time.Duration `desc:"分片时长"` // 分片时长 RealTime bool `desc:"是否实时录制"` // 是否实时录制 Append bool `desc:"是否追加录制"` // 是否追加录制 diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index 9977154e..2ef4f305 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "strings" "time" task "github.com/langhuihui/gotask" @@ -148,6 +149,12 @@ type Recorder struct { } var CustomFileName = func(job *m7s.RecordJob) string { + if fn := job.RecConf.FileName; fn != "" { + if !strings.HasSuffix(strings.ToLower(fn), ".flv") { + fn = fn + ".flv" + } + return filepath.Join(job.RecConf.FilePath, fn) + } if job.RecConf.Fragment == 0 || job.RecConf.Append { return fmt.Sprintf("%s.flv", job.RecConf.FilePath) } @@ -160,6 +167,7 @@ func (r *Recorder) createStream(start time.Time) (err error) { if err != nil { return } + r.Debug("flv create file", "filePath", r.Event.FilePath) // 获取存储实例 st := r.RecordJob.GetStorage() diff --git a/plugin/hls/pkg/record.go b/plugin/hls/pkg/record.go index c4b21668..19e79223 100644 --- a/plugin/hls/pkg/record.go +++ b/plugin/hls/pkg/record.go @@ -1,10 +1,11 @@ package hls import ( - "context" - "fmt" - "path/filepath" - "time" + "context" + "fmt" + "strings" + "path/filepath" + "time" "m7s.live/v5" "m7s.live/v5/pkg/codec" @@ -28,15 +29,26 @@ type Recorder struct { } var CustomFileName = func(job *m7s.RecordJob) string { - if job.RecConf.Fragment == 0 || job.RecConf.Append { - return fmt.Sprintf("%s/%s.ts", job.RecConf.FilePath, time.Now().Format("20060102150405")) - } - return filepath.Join(job.RecConf.FilePath, time.Now().Format("20060102150405")+".ts") + if fn := job.RecConf.FileName; fn != "" { + if !strings.HasSuffix(strings.ToLower(fn), ".ts") { + fn = fn + ".ts" + } + return filepath.Join(job.RecConf.FilePath, fn) + } + if job.RecConf.Fragment == 0 || job.RecConf.Append { + return fmt.Sprintf("%s/%s.ts", job.RecConf.FilePath, time.Now().Format("20060102150405")) + } + return filepath.Join(job.RecConf.FilePath, time.Now().Format("20060102150405")+".ts") } func (r *Recorder) createStream(start time.Time) (err error) { - r.RecordJob.RecConf.Type = "ts" - return r.CreateStream(start, CustomFileName) + r.RecordJob.RecConf.Type = "ts" + err = r.CreateStream(start, CustomFileName) + if err != nil { + return err + } + r.Debug("ts create file", "filePath", r.Event.FilePath) + return nil } func (r *Recorder) writeTailer(end time.Time) { diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index 954f9194..60ce8b97 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -386,7 +386,6 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) { if err != nil { return } - p.Info("read", "file", file.Name()) // 创建解复用器并解析文件 demuxer := mp4.NewDemuxer(file) @@ -558,6 +557,7 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) { func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) (res *mp4pb.ResponseStartRecord, err error) { var recordExists bool var filePath = "." + var fileName = "" var fragment = time.Minute if req.Fragment != nil { fragment = req.Fragment.AsDuration() @@ -565,9 +565,14 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) if req.FilePath != "" { filePath = req.FilePath } + if req.FileName != "" { + fileName = req.FileName + } + + p.Debug("mp4 plugin start record", "streamPath", req.StreamPath, "filePath", filePath, "fileName", fileName, "fragment", fragment) res = &mp4pb.ResponseStartRecord{} _, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool { - return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath + return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath && job.RecConf.FileName == req.FileName }) if recordExists { err = pkg.ErrRecordExists @@ -578,7 +583,9 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) Append: false, Fragment: fragment, FilePath: filePath, + FileName: fileName, } + var stream *m7s.Publisher var ok bool if stream, ok = p.Server.Streams.SafeGet(req.StreamPath); !ok { @@ -595,6 +602,7 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) } } job := p.Record(stream, recordConf, nil) + p.Debug("mp4 record job", "taskPtr", uint64(job.GetTaskPointer())) res.Data = uint64(job.GetTaskPointer()) err = job.WaitStarted() return diff --git a/plugin/mp4/pb/mp4.pb.go b/plugin/mp4/pb/mp4.pb.go index 843809ad..1d8c2bec 100644 --- a/plugin/mp4/pb/mp4.pb.go +++ b/plugin/mp4/pb/mp4.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 -// protoc v5.29.3 +// protoc-gen-go v1.36.10 +// protoc v6.33.1 // source: mp4.proto package pb @@ -366,6 +366,7 @@ type ReqStartRecord struct { StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` Fragment *durationpb.Duration `protobuf:"bytes,2,opt,name=fragment,proto3" json:"fragment,omitempty"` FilePath string `protobuf:"bytes,3,opt,name=filePath,proto3" json:"filePath,omitempty"` + FileName string `protobuf:"bytes,4,opt,name=fileName,proto3" json:"fileName,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -421,6 +422,13 @@ func (x *ReqStartRecord) GetFilePath() string { return "" } +func (x *ReqStartRecord) GetFileName() string { + if x != nil { + return x.FileName + } + return "" +} + type ResponseStartRecord struct { state protoimpl.MessageState `protogen:"open.v1"` Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` @@ -1096,13 +1104,14 @@ const file_mp4_proto_rawDesc = "" + "\x13ResponseEventRecord\x12\x12\n" + "\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\x12\x12\n" + - "\x04data\x18\x03 \x01(\rR\x04data\"\x83\x01\n" + + "\x04data\x18\x03 \x01(\rR\x04data\"\x9f\x01\n" + "\x0eReqStartRecord\x12\x1e\n" + "\n" + "streamPath\x18\x01 \x01(\tR\n" + "streamPath\x125\n" + "\bfragment\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\bfragment\x12\x1a\n" + - "\bfilePath\x18\x03 \x01(\tR\bfilePath\"W\n" + + "\bfilePath\x18\x03 \x01(\tR\bfilePath\x12\x1a\n" + + "\bfileName\x18\x04 \x01(\tR\bfileName\"W\n" + "\x13ResponseStartRecord\x12\x12\n" + "\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\x12\x12\n" + diff --git a/plugin/mp4/pb/mp4.pb.gw.go b/plugin/mp4/pb/mp4.pb.gw.go index 3c0444ab..0b437289 100644 --- a/plugin/mp4/pb/mp4.pb.gw.go +++ b/plugin/mp4/pb/mp4.pb.gw.go @@ -44,7 +44,9 @@ func request_Api_List_0(ctx context.Context, marshaler runtime.Marshaler, client metadata runtime.ServerMetadata err error ) - io.Copy(io.Discard, req.Body) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["streamPath"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") @@ -92,7 +94,9 @@ func request_Api_Catalog_0(ctx context.Context, marshaler runtime.Marshaler, cli protoReq emptypb.Empty metadata runtime.ServerMetadata ) - io.Copy(io.Discard, req.Body) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } msg, err := client.Catalog(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err } @@ -115,6 +119,9 @@ func request_Api_Delete_0(ctx context.Context, marshaler runtime.Marshaler, clie if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["streamPath"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") @@ -156,6 +163,9 @@ func request_Api_EventStart_0(ctx context.Context, marshaler runtime.Marshaler, if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } msg, err := client.EventStart(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err } @@ -181,6 +191,9 @@ func request_Api_StartRecord_0(ctx context.Context, marshaler runtime.Marshaler, if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["streamPath"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") @@ -223,6 +236,9 @@ func request_Api_StopRecord_0(ctx context.Context, marshaler runtime.Marshaler, if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["streamPath"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") @@ -264,6 +280,9 @@ func request_Api_CreateTag_0(ctx context.Context, marshaler runtime.Marshaler, c if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } msg, err := client.CreateTag(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err } @@ -289,6 +308,9 @@ func request_Api_UpdateTag_0(ctx context.Context, marshaler runtime.Marshaler, c if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["id"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") @@ -331,6 +353,9 @@ func request_Api_DeleteTag_0(ctx context.Context, marshaler runtime.Marshaler, c if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } val, ok := pathParams["id"] if !ok { return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") @@ -371,7 +396,9 @@ func request_Api_ListTag_0(ctx context.Context, marshaler runtime.Marshaler, cli protoReq ReqListTag metadata runtime.ServerMetadata ) - io.Copy(io.Discard, req.Body) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } if err := req.ParseForm(); err != nil { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } diff --git a/plugin/mp4/pb/mp4.proto b/plugin/mp4/pb/mp4.proto index f8a275e2..455a36d7 100644 --- a/plugin/mp4/pb/mp4.proto +++ b/plugin/mp4/pb/mp4.proto @@ -106,6 +106,7 @@ message ReqStartRecord { string streamPath = 1; google.protobuf.Duration fragment = 2; string filePath = 3; + string fileName = 4; } message ResponseStartRecord { @@ -171,4 +172,4 @@ message ResponseTagList { string message = 2; repeated TagInfo list = 3; uint32 total = 4; -} \ No newline at end of file +} diff --git a/plugin/mp4/pb/mp4_grpc.pb.go b/plugin/mp4/pb/mp4_grpc.pb.go index 8eff1830..e3460c74 100644 --- a/plugin/mp4/pb/mp4_grpc.pb.go +++ b/plugin/mp4/pb/mp4_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v5.29.3 +// - protoc v6.33.1 // source: mp4.proto package pb diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 5bd078a9..7b80f02e 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -6,9 +6,11 @@ import ( "io" "os" "path/filepath" + "strings" "time" task "github.com/langhuihui/gotask" + "gorm.io/gorm" m7s "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/codec" @@ -28,12 +30,17 @@ var writeTrailerQueueTask WriteTrailerQueueTask type writeTrailerTask struct { task.Task - muxer *Muxer - file storage.File - filePath string + muxer *Muxer + file storage.File + filePath string + recordID uint // 录像记录ID + targetStorage map[string]any // 目标存储配置 + deleteLocal bool // 上传成功后是否删除本地文件 + db *gorm.DB // 数据库连接 } func (task *writeTrailerTask) Start() (err error) { + task.Info("write trailer start") err = task.muxer.WriteTrailer(task.file) if err != nil { task.Error("write trailer", "err", err) @@ -139,6 +146,15 @@ func (r *Recorder) writeTailer(end time.Time) { } var CustomFileName = func(job *m7s.RecordJob) string { + // 如果指定了文件名,使用指定的文件名 + if fn := job.RecConf.FileName; fn != "" { + // 确保文件名包含 .mp4 扩展名 + if !strings.HasSuffix(strings.ToLower(fn), ".mp4") { + fn = fn + ".mp4" + } + return filepath.Join(job.RecConf.FilePath, fn) + } + // 否则使用时间戳生成文件名 now := time.Now() return filepath.Join(job.RecConf.FilePath, fmt.Sprintf("%s_%09d.mp4", time.Now().Local().Format("2006-01-02-15-04-05"), now.Nanosecond())) } diff --git a/recoder.go b/recoder.go index d349947d..5757be96 100644 --- a/recoder.go +++ b/recoder.go @@ -2,6 +2,7 @@ package m7s import ( "fmt" + "path/filepath" "time" "gorm.io/gorm" @@ -72,6 +73,7 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec // 生成文件路径 filePath := customFileName(recordJob) + fileName := filepath.Base(filePath) var storageType string recordJob.storage = recordJob.Plugin.Server.Storage @@ -87,6 +89,7 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec StartTime: start, StreamPath: sub.StreamPath, FilePath: filePath, + Filename: fileName, Type: recordJob.RecConf.Type, StorageLevel: 1, // 默认为主存储 StorageType: storageType, @@ -98,6 +101,7 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec if sub.Publisher.HasVideoTrack() { r.Event.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String() } + if recordJob.Plugin.DB != nil && recordJob.RecConf.Mode != config.RecordModeTest { if recordJob.Event != nil { r.Event.RecordEvent = recordJob.Event