golang源码阅读:livego直播系统

2022年1月26日 336点热度 0人点赞 0条评论

在分析源码之前,先搭建一个直播系统:

直播服务器

https://github.com/gwuhaolin/livego

播放站点

https://github.com/Bilibili/flv.js/

推流

https://github.com/obsproject/obs-studio

首先启动直播服务器

./livego --flv_dir=./data --level=debug

1,在启动livego服务后默认会监听以下端口:

8090端口:用于控制台,通过HTTP请求可查看与控制直播房间的推拉流

1935端口:用于RTMP推拉流,目前貌似只能通过RTMP方式推流

7001端口:用于FLV拉流

7002端口:用于HLS拉流

2,创建直播房间:

请求:http://你的服务器地址:8090/control/get?room=房间名字(房间名字你自己自定义)

成功响应:{“status”:200,“data”:一段与房间名对应的MD5秘钥}

http://127.0.0.1:8090/control/get?room=xiazemin{status: 200,data: "rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk"}

3,启动推流服务器

配置文件livego.yaml里可以看到默认的app名字,串流密码输入room名字,这里设置成xiazemin

server:- appname: live

图片

4,启动网页服务器,拉取视频内容

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1.0">    <title>课程直播</title></head><body>    <script src="flv.min.js"></script>    <video id="videoElement" autoplay controls loop preload muted></video>    <script>        if (flvjs.isSupported()) {            var videoElement = document.getElementById('videoElement');            var flvPlayer = flvjs.createPlayer({                type: 'flv',                isLive: true,                url: 'http://127.0.0.1:7001/live/rfBd56ti2SMtYvSgD5xAV0YU99zampta7Z7S575KLkIZ9PYk.flv'            });            flvPlayer.attachMediaElement(videoElement);            flvPlayer.load();            flvPlayer.play();            document.body.addEventListener('mousedown', function(){    var vdo = $("video")[0]; //jquery    vdo.muted = false;}, false);          }</script></body></html>

下载我们依赖的flv播放库

 https://cdn.bootcdn.net/ajax/libs/flv.js/1.6.1/flv.min.js

启动一个静态服务器,返回上述网页

package main
import ( "fmt" "net/http")
//http://newbt.net/ms/vdisk/show_bbs.php?id=56A46991BF52F1A048DB5F0350D74D01&page=0func handler(w http.ResponseWriter, r *http.Request) { w.Header().Add("Access-Control-Allow-Origin", "*") w.Header().Add("Access-Control-Allow-Methods", "GET, DELETE, HEAD, OPTIONS,POST,PUT,PATCH") w.Header().Add("Access-Control-Allow-Headers", "x-requested-with,content-type") w.Header().Add("Access-Control-Allow-Credentials", "true") http.FileServer(http.Dir("./")).ServeHTTP(w, r) return}
//https://stackoverflow.com/questions/69435888/how-to-ensure-cors-response-header-values-are-valid-when-querying-mongodb-in-n
func main() { http.HandleFunc("/", handler) if err := http.ListenAndServe(":8089", nil); err != nil { fmt.Println(err) }}

至此直播服务器搭建完毕。

livego还提供了其他推拉流管理的接口和统计信息相关的接口

http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin
{status: 400,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
http://127.0.0.1:8090/stat/livestat
{status: 200,data: {publishers: [{key: "live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",url: "rtmp://localhost:1935/live/mPjVIUp97tQjNHJcaOAGeDZRvcMdGIASmHsVKxASAgqjn9FS",stream_id: 1,video_total_bytes: 1940374,video_speed: 1465,audio_total_bytes: 206931,audio_speed: 161}],players: null}}
http://127.0.0.1:8090/control/push?&oper=start&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin
{status: 200,data: "<h1>push url start rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}
http://127.0.0.1:8090/control/pull?&oper=stop&app=live&name=xiazemin&url=rtmp://127.0.0.1:1935/live/xiazemin{status: 400,data: "<h1>push url stop rtmp://127.0.0.1:1935/live/xiazemin ok</h1></br>"}

        下面我们开始分析源码,整体包括三个部分,推流,拉流,推拉流管理。我从livego的入口开始main.go的main函数:创建了RTMP stream,然后粉笔起了拉流服务hls和httpflv,然后起了控制台服务器,最后起了推流服务器:

      rtmpServer = rtmp.NewRtmpServer(stream, nil)
rtmpServer = rtmp.NewRtmpServer(stream, hlsServer) rtmpServer.Serve(rtmpListen)
    func startHls() *hls.Server       hlsServer := hls.NewServer()      go func() {        hlsServer.Serve(hlsListen)
func startHTTPFlv(stream *rtmp.RtmpStream) 
hdlServer := httpflv.NewServer(stream) go func() { hdlServer.Serve(flvListen)
func startAPI(stream *rtmp.RtmpStream) 
opServer := api.NewServer(stream, rtmpAddr) go func() { opServer.Serve(opListen)
  stream := rtmp.NewRtmpStream()    hlsServer = startHls()    startHTTPFlv(stream)    startAPI(stream)    startRtmp(stream, hlsServer)

推流是基于rtmp协议的,底层是基于tcp协议的,在循环中监听连接,每个连接起一个协程:

protocol/rtmp/rtmp.go

func (s *Server) Serve(listener net.Listener) (err error)       netconn, err = listener.Accept()      conn := core.NewConn(netconn, 4*1024)      go s.handleConn(conn)  
func (s *Server) handleConn(conn *core.Conn) error       connServer := core.NewConnServer(conn)      appname, name, _ := connServer.GetInfo()      key, err := configure.RoomKeys.GetKey(name)      channel, err := configure.RoomKeys.GetChannel(name)       pushlist, ret := configure.GetStaticPushUrlList(appname);       s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))

本质上做的工作就是把数据从输入channel copy到输出channel,数据存储在sync.Map里面:

protocol/rtmp/stream.go

type RtmpStream struct {  streams *sync.Map //key}
func (rs *RtmpStream) HandleReader(r av.ReadCloser) 
i, ok := rs.streams.Load(info.Key) ns := NewStream() rs.streams.Store(info.Key, ns) stream.AddReader(r)
func (rs *RtmpStream) HandleWriter(w av.WriteCloser)
item, ok := rs.streams.Load(info.Key) rs.streams.Store(info.Key, s) s.AddWriter(w)
func (rs *RtmpStream) CheckAlive()
rs.streams.Delete(key)

HLS(Http Live Streaming传输内容包括两部分:一是M3U8描述文件,二是TS媒体文件。TS媒体文件中的视频必须是H264编码,音频必须是AAC或MP3编码。protocol/hls/hls.go

type Server struct {  listener net.Listener  conns    *sync.Map}  
func (server *Server) Serve(listener net.Listener) error
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { server.handle(w, r) })
func (server *Server) handle(w http.ResponseWriter, r *http.Request) 
path.Base(r.URL.Path) == "crossdomain.xml" switch path.Ext(r.URL.Path) case ".m3u8": key, _ := server.parseM3u8(r.URL.Path) conn := server.getConn(key) tsCache := conn.GetCacheInc() body, err := tsCache.GenM3U8PlayList() w.Header().Set("Access-Control-Allow-Origin", "*") w.Write(body) case ".ts": key, _ := server.parseTs(r.URL.Path) conn := server.getConn(key) tsCache := conn.GetCacheInc() item, err := tsCache.GetItem(r.URL.Path)

我们前面搭建的直播系统是基于http flv 协议的:protocol/httpflv/server.go

type Server struct {  handler av.Handler}
func (server *Server) Serve(l net.Listener) error
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { server.handleConn(w, r) }) mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) { server.getStream(w, r) })
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) 
path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv") paths := strings.SplitN(path, "/", 2) msgs := server.getStreams(w, r) for _, item := range msgs.Publishers { if item.Key == path { include = true writer := NewFLVWriter(paths[0], paths[1], url, w) server.handler.HandleWriter(writer)
func (server *Server) getStream(w http.ResponseWriter, r *http.Request) 
msgs := server.getStreams(w, r) w.Write(resp)
func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams
rtmpStream := server.handler.(*rtmp.RtmpStream) msgs := new(streams) rtmpStream.GetStreams().Range(func(key, val interface{}) bool { msg := stream{key.(string), s.GetReader().Info().UID} msgs.Publishers = append(msgs.Publishers, msg) rtmpStream.GetStreams().Range(func(key, val interface{}) bool { ws := val.(*rtmp.Stream).GetWs() ws.Range(func(k, v interface{}) bool { msg := stream{key.(string), pw.GetWriter().Info().UID} msgs.Players = append(msgs.Players, msg)

protocol/hls/cache.go

func (tcCacheItem *TSCacheItem) GenM3U8PlayList() ([]byte, error)      w.Write(m3u8body.Bytes())
func (tcCacheItem *TSCacheItem) GetItem(key string) (TSItem, error)
item, ok := tcCacheItem.lm[key]
type TSCacheItem struct {
id string num int lock sync.RWMutex ll *list.List lm map[string]TSItem}

protocol/httpflv/writer.go

type FLVWriter struct {  Uid string  av.RWBaser  app, title, url string  buf             []byte  closed          bool  closedChan      chan struct{}  ctx             http.ResponseWriter  packetQueue     chan *av.Packet}

av/av.go

type Handler interface {  HandleReader(ReadCloser)  HandleWriter(WriteCloser)}

直播服务的管理控制接口实现在protocol/api/api.go:

type Server struct {  handler  av.Handler  session  map[string]*rtmprelay.RtmpRelay  rtmpAddr string}

它也实现了一个静态文件服务器,如果有静态文件,可以放在static里面:

func (s *Server) Serve(l net.Listener) error 
   mux.Handle("/statics/", http.StripPrefix("/statics/", http.FileServer(http.Dir("statics"))))   mux.HandleFunc("/control/push"func(w http.ResponseWriter, r *http.Request) { s.handlePush(w, r) }) mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) { s.handlePull(w, r) }) mux.HandleFunc("/control/get", func(w http.ResponseWriter, r *http.Request) { s.handleGet(w, r) }) mux.HandleFunc("/control/reset", func(w http.ResponseWriter, r *http.Request) { s.handleReset(w, r) })  mux.HandleFunc("/control/delete"func(w http.ResponseWriter, r *http.Request) {    s.handleDelete(w, r) })  mux.HandleFunc("/stat/livestat"func(w http.ResponseWriter, r *http.Request) {    s.GetLiveStatics(w, r) })
func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) 
//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456 oper == "stop" pushRtmprelay, found := s.session[keyString] pushRtmprelay.Stop() delete(s.session, keyString) else pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl) err = pushRtmprelay.Start()            s.session[keyString] = pushRtmprelay      
func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) 
//http://127.0.0.1:8090/control/pull?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456 oper == "stop" pullRtmprelay, found := s.session[keyString] pullRtmprelay.Stop() else pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl) err = pullRtmprelay.Start() s.session[keyString] = pullRtmprelay
func (s *Server) handleGet(w http.ResponseWriter, r *http.Request)           //http://127.0.0.1:8090/control/get?room=ROOM_NAME          msg, err := configure.RoomKeys.GetKey(room)    
func (s *Server) handleReset(w http.ResponseWriter, r *http.Request)           //http://127.0.0.1:8090/control/reset?room=ROOM_NAME          msg, err := configure.RoomKeys.SetKey(room)      
func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request)          //http://127.0.0.1:8090/control/delete?room=ROOM_NAME          configure.RoomKeys.DeleteChannel(room)     
func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request)          //http://127.0.0.1:8090/stat/livestat          rtmpStream := server.handler.(*rtmp.RtmpStream)          rtmpStream.GetStreams().Range(func(key, val interface{}) bool {            v := s.GetReader().(*rtmp.VirReader)              msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,              v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}          rtmpStream.GetStreams().Range(func(key, val interface{}) bool {            v := pw.GetWriter().(*rtmp.VirWriter)            msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,                v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}          roomInfo, exists := (rtmpStream.GetStreams()).Load(room)      http.Serve(l, JWTMiddleware(mux))              ErrorHandler: func(w http.ResponseWriter, r *http.Request, err string) {        res := &Response{          w:      w,          Status: 403,          Data:   err,        }        res.SendJson()      },

protocol/rtmp/rtmprelay/rtmprelay.go

type RtmpRelay struct {  PlayUrl              string  PublishUrl           string  cs_chan              chan core.ChunkStream  sndctrl_chan         chan string  connectPlayClient    *core.ConnClient  connectPublishClient *core.ConnClient  startflag            bool}  
func (self *RtmpRelay) Start() error 
self.connectPlayClient = core.NewConnClient() self.connectPublishClient = core.NewConnClient() err := self.connectPlayClient.Start(self.PlayUrl, av.PLAY) err = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH) go self.rcvPlayChunkStream() go self.sendPublishChunkStream()
func (self *RtmpRelay) rcvPlayChunkStream() {      err := self.connectPlayClient.Read(&rc)            r := bytes.NewReader(rc.Data)      vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)      self.cs_chan <- rc
func (self *RtmpRelay) sendPublishChunkStream() 
self.connectPublishClient.Write(rc) self.connectPublishClient.Close(nil)

configure/channel.go

var RoomKeys = &RoomKeysType{  localCache: cache.New(cache.NoExpiration, 0),}
type RoomKeysType struct {
redisCli *redis.Client localCache *cache.Cache}
func (r *RoomKeysType) SetKey(channel string) (key string, err error) 
key = uid.RandStringRunes(48) if _, err = r.redisCli.Get(key).Result(); err == redis.Nil { err = r.redisCli.Set(channel, key, 0).Err() err = r.redisCli.Set(key, channel, 0).Err()
func (r *RoomKeysType) DeleteChannel(channel string) bool 
r.redisCli.Del(channel).Err() key, ok := r.localCache.Get(channel) r.localCache.Delete(channel) r.localCache.Delete(key.(string))

推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

图片

3020golang源码阅读:livego直播系统

root

这个人很懒,什么都没留下

文章评论