Using go language to develop streaming media video website
InfoQ 2021-06-04 10:14:30
{"type":"doc","content":[{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":" brief introduction ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Streaming media has become an important technology in industry , such as : Live website 、 Video surveillance transmission 、APP Live broadcast, etc , How to realize a high concurrent video website , This involves the selection of language technology and the use of streaming media technology , This section will focus on how to use Golang To realize a streaming video website .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":" Catalog ","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Why choose Go as well as Go Some advantages of ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"GoLang Introduction and implementation of a webserver Tool chain ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Golang Of channel Concurrent mode ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" use Golang Complete a streaming website ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Website deployment ","attrs":{}}]}]}],"attrs":{}},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":" Why choose Go as well as Go Some advantages of ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Why choose Go To develop a video website ? This is mainly reflected in Go The advantages of language . that Go What are the advantages ?","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" High development efficiency , In any other language , You need a lot of other configurations or plug-ins , Even the whole family has a complete set of buckets Java Every language needs a Servlet engine , Such as :tomcat、jetty etc. . but Go In this regard , Provides a unique ability . Most of the functions and content have been integrated in pkg. Including the development of a complete development tool chain (tools、test、benchmark、builtin.etc), Include Go command (go test、go install、go build). These are all complete , Direct download Go Ready to use . Including audio and video related plug-ins 、 To configure , It's all included in pkg. So use go Turn on the audio video , A perfect interpretation of go The advantages of language .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" On the other hand , Simple deployment ,go It's a compiler language , And it's a language that can compile multiple platform executables .Compile once,run everywhere, Directly compile and generate binary files , Direct operation .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" good native http library 、 Integrated template engine , No need to add a third party framework .","attrs":{}}]}]}],"attrs":{}},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"GoLang Introduction and implementation of a webserver Tool chain ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Go Language is a compiler language , An open source programming language , Can make the structure simple 、 Reliable and efficient software becomes easy . And its goal is to have Python Dynamic language development speed and integration C/C++ And so on . Its main feature is :","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" concise 、 Fast 、 Security 、 parallel 、 memory management 、 Array security 、 Compile quickly .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Go There are some common toolchains in , such as :","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go build, compile go file , It can be compiled across platforms :env GOOS=linux GOARCH=amd64 go build, stay CI/CD in , This is a very useful order .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go install, This is also compilation , But with build The difference is that after compiling, the output file is packaged as a library and placed in pkg Next .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go get, Used to get go Third party package for , Common is :go get -u git Address , From git Get a resource on and install it locally .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go fmt, Unified code style 、 Typesetting , This will make go The code is easier to read 、 Easy to understand .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go test, Run... In the current directory tests,\"go test -v\" Will print all the information , and \"go test\" Only the results of the test will be printed .","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"go Of test The documents are usually in the form of XXX_test.go name , such , In execution \"go test\" When , The program will automatically execute those added test The file of , It's a form of engagement .","attrs":{}}]}]}],"attrs":{}},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" The main points of :","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" Use TestMain As initialization test, And use Run() To call other tests Can complete some need to initialize the operation testing, Such as : Audio and video resource database 、 File loading, etc , Some of these may need to be used many times , But in design patterns , It will only load into memory once , such , Can reduce excessive memory footprint , At the same time, it can be cleaned up at one time .","attrs":{}}]}]}],"attrs":{}},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"func TestMain(m *testing.M) {\n    fmt.Println(\"Test begin\")\n    m.Run()\n}\n","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" If you don't add it Run(), except TestMain The rest of it tests They are not executed .","attrs":{}}]}]}],"attrs":{}},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"func TestPrint(t *testing.T) {\n    fmt.Println(\"Test print\")\n}\n\nfunc TestMain(m *testing.M) {\n    fmt.Println(\"Test begin\")\n    //m.Run()\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" According to the above , If not implemented Run() Method , be TestPrint Function will not be executed .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"Golang Of channel Concurrent mode ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay Go in , Now that we have Xiecheng , So how do these processes communicate with each other ?Go Provides a ","attrs":{}},{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"channel( passageway )","attrs":{}},{"type":"text","text":" To solve .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" Make a statement channel","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay Go In language , Make a statement channel It's simple , Use the built-in make Function , as follows :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"ch:=make(chan string)","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" among chan It's a keyword , Said is channel type . hinder string Express channel The data in is string type . adopt channel We can also see that ,chan It's a collection type .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Well defined chan Then it can be used , One chan There are only two kinds of operations : Send and receive :","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" send out : towards chan Send value , Put the value in chan in , The operator is chan <-","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" receive : obtain chan The value in , The operator is <- chan","attrs":{}}]}]}],"attrs":{}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Example :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"package main\n\nimport \"fmt\"\n\nfunc main() {\n\n ch := make(chan string)\n\n go func() {\n\n  fmt.Println(\"La\")\n\n  ch <- \" Sender :La\"\n\n }()\n\n fmt.Println(\"I am main goroutine\")\n\n v := <- ch\n\n fmt.Println(\" The received chan The value of :\",v)\n\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Let's start with the execution and look at the print results :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"I am main goroutine\n\nLa\n\n The received chan The value of : The data sender :La\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" As you can see from the results : Reached the use of time.Sleep The effect of the function .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" I believe we should understand why the program will not be in the new goroutine Quit before it's done , Because by make Created chan No value in , and main goroutine I want to start from chan Get the value , If you can't get it, just wait , Wait until the other goroutine towards chan Send the value .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" No buffer channel","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In the example above , Use make Created chan It's a buffer free channel, Its capacity is 0, Can't store any data . So there's no buffer channel It's only used to transmit data , The data is not in channel Do anything to stay in . It also means that , No buffer channel The sending and receiving operations of are simultaneous , It's also called synchronization channel.","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" There's a cushion channel","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" There's a cushion channel Like a ","attrs":{}},{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" Blocking queues ","attrs":{}},{"type":"text","text":", Internal elements are first in, first out . adopt make The second parameter of the function can specify channel The size of the capacity , And then create a buffer channel, Such as :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"cacheCh := make(chan int,5)\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" We define a capacity as 5 The element of is int Type of chan.","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" One has buffers channel It has the following characteristics :","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" There's a cushion channel There is a buffer queue inside the ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" The send operation is to insert elements to the end of the queue , If the queue is full , Then block and wait , Until the other goroutine perform , The receive operation releases the space in the queue ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" The receive operation takes the element from the head of the queue and removes it from the queue , If the queue is empty , Then block and wait , Until the other goroutine perform , The send operation inserts a new element ","attrs":{}}]}]}],"attrs":{}},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"cache := make(chan int,5)\n\ncache <- 2\n\ncache <- 3\n\nfmt.Println(\" Capacity :\",cap(cache),\", Element number :\",len(cache))\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" No buffer channel In fact, it is a capacity of 0 Of channel. such as make(chan int,0)","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" close channel","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" With built-in functions close You can shut down channel. If one channel It's closed. , You can't send data to it , If you send it , Can cause painc abnormal . But it can also receive channel The data in , If channel If there's no data in it , The data received is the zero value of the element type .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" A one-way channel","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" One way , You can send it or not , Or you can only receive . So one way channel The statement is also very simple , Just bring the statement with you <- The operator can , as follows :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"send := make(chan <- int)\nreceive := make(<- chan int)\n","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":" use Golang Complete a streaming website ","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":" Business module ","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"API Interface design ","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" layered ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"Restful Style design ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"CRUD Differentiate resource operations ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":" Return code specification ","attrs":{}}]}]}],"attrs":{}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" First , Let's write a startup class :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"package main \n\nimport (\n \"net/http\"\n \"github.com/julienschmidt/httprouter\"\n)\n\ntype middleWareHandler struct {\n r *httprouter.Router\n}\n\nfunc NewMiddleWareHandler(r *httprouter.Router) http.Handler {\n m := middleWareHandler{}\n m.r = r\n return m\n}\n\nfunc (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {\n //check session\n validateUserSession(r)\n\n m.r.ServeHTTP(w, r)\n}\n\nfunc RegisterHandlers() *httprouter.Router {\n router := httprouter.New()\n\n router.POST(\"/user\", CreateUser)\n\n router.POST(\"/user/:user_name\", Login)\n\n return router\n}\n\nfunc main() {\n r := RegisterHandlers()\n mh := NewMiddleWareHandler(r)\n http.ListenAndServe(\":1000\", mh)\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Here we have registered 、 Login and some initialization listening ports . Next , We need to look at back-end video processing , The main concern is session:","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"package session\n\nimport (\n \"time\"\n \"sync\"\n \"github.com/avenssi/video_server/api/defs\"\n \"github.com/avenssi/video_server/api/dbops\"\n \"github.com/avenssi/video_server/api/utils\"\n)\n\nvar sessionMap *sync.Map \n\nfunc init() {\n sessionMap = &sync.Map{}\n}\n\nfunc nowInMilli() int64{\n return time.Now().UnixNano()/1000000\n}\n\nfunc deleteExpiredSession(sid string) {\n sessionMap.Delete(sid)\n dbops.DeleteSession(sid)\n}\n\nfunc LoadSessionsFromDB() {\n r, err := dbops.RetrieveAllSessions()\n if err != nil {\n  return\n }\n\n r.Range(func(k, v interface{}) bool{\n  ss := v.(*defs.SimpleSession)\n  sessionMap.Store(k, ss)\n  return true\n })\n}\n\nfunc GenerateNewSessionId(un string) string {\n id, _ := utils.NewUUID()\n ct := nowInMilli()\n ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min\n\n ss := &defs.SimpleSession{Username: un, TTL: ttl}\n sessionMap.Store(id, ss)\n dbops.InsertSession(id, ttl, un)\n\n return id\n}\n\nfunc IsSessionExpired(sid string) (string, bool) {\n ss, ok := sessionMap.Load(sid)\n if ok {\n  ct := nowInMilli()\n  if ss.(*defs.SimpleSession).TTL < ct {\n   deleteExpiredSession(sid)\n   return \"\", true\n  }\n\n  return ss.(*defs.SimpleSession).Username, false\n }\n\n return \"\", true\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" From the above code , You can see ,Go It mainly refers to the related video plug-in library :avenssi/video_server And so on session. That's why go One reason for developing back end .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" meanwhile , We also define an error code message :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":null},"content":[{"type":"text","text":"package defs\n\ntype Err struct {\n Error string `json:\"error\"`\n ErrorCode string `json:\"error_code\"`  \n}\n\ntype ErrResponse struct {\n HttpSC int\n Error Err\n}\n\nvar (\n ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: \"Request body is not correct\", ErrorCode: \"001\"}}\n ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: \"User authentication failed.\", ErrorCode: \"002\"}}\n ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: \"DB ops failed\", ErrorCode: \"003\"}}\n ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: \"Internal service error\", ErrorCode: \"004\"}}\n)\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This is the main logic for processing in the business layer , The following is mainly about streaming and playing .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" Push flow ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" adopt RTMP Protocol push video stream , Here, when you push the stream , need cache cache , In this way, the constant communication of the server can be avoided crash, Because every process keeps a long link , When each client process constantly requests , Then the process on the server side will be blocked , therefore , In the transport protocol , Most of the audio and video , Streaming media processing , You need to add cache To buffer the pressure on the server , Avoid the paralysis of the whole service , This causes the client to remain unresponsive , This causes the client to get stuck :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"type Cache struct {\n\tgop *GopCache\n\tvideoSeq *SpecialCache\n\taudioSeq *SpecialCache\n\tmetadata *SpecialCache\n}\n\nfunc NewCache() *Cache {\n\treturn &Cache{\n\t\tgop: NewGopCache(configure.Config.GetInt(\"gop_num\")),\n\t\tvideoSeq: NewSpecialCache(),\n\t\taudioSeq: NewSpecialCache(),\n\t\tmetadata: NewSpecialCache(),\n\t}\n}\n\nfunc (cache *Cache) Write(p av.Packet) {\n\tif p.IsMetadata {\n\t\tcache.metadata.Write(&p)\n\t\treturn\n\t} else {\n\t\tif !p.IsVideo {\n\t\t\tah, ok := p.Header.(av.AudioPacketHeader)\n\t\t\tif ok {\n\t\t\t\tif ah.SoundFormat() == av.SOUND_AAC &&\n\t\t\t\t\tah.AACPacketType() == av.AAC_SEQHDR {\n\t\t\t\t\tcache.audioSeq.Write(&p)\n\t\t\t\t\treturn\n\t\t\t\t} else {\n\t\t\t\t\treturn\n\t\t\t\t}\n\t\t\t}\n\n\t\t} else {\n\t\t\tvh, ok := p.Header.(av.VideoPacketHeader)\n\t\t\tif ok {\n\t\t\t\tif vh.IsSeq() {\n\t\t\t\t\tcache.videoSeq.Write(&p)\n\t\t\t\t\treturn\n\t\t\t\t}\n\t\t\t} else {\n\t\t\t\treturn\n\t\t\t}\n\n\t\t}\n\t}\n\tcache.gop.Write(&p)\n}\n\nfunc (cache *Cache) Send(w av.WriteCloser) error {\n\tif err := cache.metadata.Send(w); err != nil {\n\t\treturn err\n\t}\n\n\tif err := cache.videoSeq.Send(w); err != nil {\n\t\treturn err\n\t}\n\n\tif err := cache.audioSeq.Send(w); err != nil {\n\t\treturn err\n\t}\n\n\tif err := cache.gop.Send(w); err != nil {\n\t\treturn err\n\t}\n\n\treturn nil\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In push streaming , The main thing is to choose first :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"const (\n\tmaxQueueNum = 1024\n\tSAVE_STATICS_INTERVAL = 5000\n)\n\nvar (\n\treadTimeout = configure.Config.GetInt(\"read_timeout\")\n\twriteTimeout = configure.Config.GetInt(\"write_timeout\")\n)\n\ntype Client struct {\n\thandler av.Handler\n\tgetter av.GetWriter\n}\n\nfunc NewRtmpClient(h av.Handler, getter av.GetWriter) *Client {\n\treturn &Client{\n\t\thandler: h,\n\t\tgetter: getter,\n\t}\n}\n\nfunc (c *Client) Dial(url string, method string) error {\n\tconnClient := core.NewConnClient()\n\tif err := connClient.Start(url, method); err != nil {\n\t\treturn err\n\t}\n\tif method == av.PUBLISH {\n\t\twriter := NewVirWriter(connClient)\n\t\tlog.Debugf(\"client Dial call NewVirWriter url=%s, method=%s\", url, method)\n\t\tc.handler.HandleWriter(writer)\n\t} else if method == av.PLAY {\n\t\treader := NewVirReader(connClient)\n\t\tlog.Debugf(\"client Dial call NewVirReader url=%s, method=%s\", url, method)\n\t\tc.handler.HandleReader(reader)\n\t\tif c.getter != nil {\n\t\t\twriter := c.getter.GetWriter(reader.Info())\n\t\t\tc.handler.HandleWriter(writer)\n\t\t}\n\t}\n\treturn nil\n}\n\nfunc (c *Client) GetHandle() av.Handler {\n\treturn c.handler\n}\n\ntype Server struct {\n\thandler av.Handler\n\tgetter av.GetWriter\n}\n\nfunc NewRtmpServer(h av.Handler, getter av.GetWriter) *Server {\n\treturn &Server{\n\t\thandler: h,\n\t\tgetter: getter,\n\t}\n}\n\nfunc (s *Server) Serve(listener net.Listener) (err error) {\n\tdefer func() {\n\t\tif r := recover(); r != nil {\n\t\t\tlog.Error(\"rtmp serve panic: \", r)\n\t\t}\n\t}()\n\n\tfor {\n\t\tvar netconn net.Conn\n\t\tnetconn, err = listener.Accept()\n\t\tif err != nil {\n\t\t\treturn\n\t\t}\n\t\tconn := core.NewConn(netconn, 4*1024)\n\t\tlog.Debug(\"new client, connect remote: \", conn.RemoteAddr().String(),\n\t\t\t\"local:\", conn.LocalAddr().String())\n\t\tgo s.handleConn(conn)\n\t}\n}\n\nfunc (s *Server) handleConn(conn *core.Conn) error {\n\tif err := conn.HandshakeServer(); err != nil {\n\t\tconn.Close()\n\t\tlog.Error(\"handleConn HandshakeServer err: \", err)\n\t\treturn err\n\t}\n\tconnServer := core.NewConnServer(conn)\n\n\tif err := connServer.ReadMsg(); err != nil {\n\t\tconn.Close()\n\t\tlog.Error(\"handleConn read msg err: \", err)\n\t\treturn err\n\t}\n\n\tappname, name, _ := connServer.GetInfo()\n\n\tif ret := configure.CheckAppName(appname); !ret {\n\t\terr := fmt.Errorf(\"application name=%s is not configured\", appname)\n\t\tconn.Close()\n\t\tlog.Error(\"CheckAppName err: \", err)\n\t\treturn err\n\t}\n\n\tlog.Debugf(\"handleConn: IsPublisher=%v\", connServer.IsPublisher())\n\tif connServer.IsPublisher() {\n\t\tif configure.Config.GetBool(\"rtmp_noauth\") {\n\t\t\tkey, err := configure.RoomKeys.GetKey(name)\n\t\t\tif err != nil {\n\t\t\t\terr := fmt.Errorf(\"Cannot create key err=%s\", err.Error())\n\t\t\t\tconn.Close()\n\t\t\t\tlog.Error(\"GetKey err: \", err)\n\t\t\t\treturn err\n\t\t\t}\n\t\t\tname = key\n\t\t}\n\t\tchannel, err := configure.RoomKeys.GetChannel(name)\n\t\tif err != nil {\n\t\t\terr := fmt.Errorf(\"invalid key err=%s\", err.Error())\n\t\t\tconn.Close()\n\t\t\tlog.Error(\"CheckKey err: \", err)\n\t\t\treturn err\n\t\t}\n\t\tconnServer.PublishInfo.Name = channel\n\t\tif pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) {\n\t\t\tlog.Debugf(\"GetStaticPushUrlList: %v\", pushlist)\n\t\t}\n\t\treader := NewVirReader(connServer)\n\t\ts.handler.HandleReader(reader)\n\t\tlog.Debugf(\"new publisher: %+v\", reader.Info())\n\n\t\tif s.getter != nil {\n\t\t\twriteType := reflect.TypeOf(s.getter)\n\t\t\tlog.Debugf(\"handleConn:writeType=%v\", writeType)\n\t\t\twriter := s.getter.GetWriter(reader.Info())\n\t\t\ts.handler.HandleWriter(writer)\n\t\t}\n\t\tif configure.Config.GetBool(\"flv_archive\") {\n\t\t\tflvWriter := new(flv.FlvDvr)\n\t\t\ts.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))\n\t\t}\n\t} else {\n\t\twriter := NewVirWriter(connServer)\n\t\tlog.Debugf(\"new player: %+v\", writer.Info())\n\t\ts.handler.HandleWriter(writer)\n\t}\n\n\treturn nil\n}\n\ntype GetInFo interface {\n\tGetInfo() (string, string, string)\n}\n\ntype StreamReadWriteCloser interface {\n\tGetInFo\n\tClose(error)\n\tWrite(core.ChunkStream) error\n\tRead(c *core.ChunkStream) error\n}\n\ntype StaticsBW struct {\n\tStreamId uint32\n\tVideoDatainBytes uint64\n\tLastVideoDatainBytes uint64\n\tVideoSpeedInBytesperMS uint64\n\n\tAudioDatainBytes uint64\n\tLastAudioDatainBytes uint64\n\tAudioSpeedInBytesperMS uint64\n\n\tLastTimestamp int64\n}\n\ntype VirWriter struct {\n\tUid string\n\tclosed bool\n\tav.RWBaser\n\tconn StreamReadWriteCloser\n\tpacketQueue chan *av.Packet\n\tWriteBWInfo StaticsBW\n}\n\nfunc NewVirWriter(conn StreamReadWriteCloser) *VirWriter {\n\tret := &VirWriter{\n\t\tUid: uid.NewId(),\n\t\tconn: conn,\n\t\tRWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)),\n\t\tpacketQueue: make(chan *av.Packet, maxQueueNum),\n\t\tWriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},\n\t}\n\n\tgo ret.Check()\n\tgo func() {\n\t\terr := ret.SendPacket()\n\t\tif err != nil {\n\t\t\tlog.Warning(err)\n\t\t}\n\t}()\n\treturn ret\n}\n\nfunc (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {\n\tnowInMS := int64(time.Now().UnixNano() / 1e6)\n\n\tv.WriteBWInfo.StreamId = streamid\n\tif isVideoFlag {\n\t\tv.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length\n\t} else {\n\t\tv.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length\n\t}\n\n\tif v.WriteBWInfo.LastTimestamp == 0 {\n\t\tv.WriteBWInfo.LastTimestamp = nowInMS\n\t} else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {\n\t\tdiffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000\n\n\t\tv.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000\n\t\tv.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000\n\n\t\tv.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes\n\t\tv.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes\n\t\tv.WriteBWInfo.LastTimestamp = nowInMS\n\t}\n}\n\nfunc (v *VirWriter) Check() {\n\tvar c core.ChunkStream\n\tfor {\n\t\tif err := v.conn.Read(&c); err != nil {\n\t\t\tv.Close(err)\n\t\t\treturn\n\t\t}\n\t}\n}\n\nfunc (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {\n\tlog.Warningf(\"[%v] packet queue max!!!\", info)\n\tfor i := 0; i < maxQueueNum-84; i++ {\n\t\ttmpPkt, ok := <-pktQue\n\t\t// try to don't drop audio\n\t\tif ok && tmpPkt.IsAudio {\n\t\t\tif len(pktQue) > maxQueueNum-2 {\n\t\t\t\tlog.Debug(\"drop audio pkt\")\n\t\t\t\t<-pktQue\n\t\t\t} else {\n\t\t\t\tpktQue <- tmpPkt\n\t\t\t}\n\n\t\t}\n\n\t\tif ok && tmpPkt.IsVideo {\n\t\t\tvideoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)\n\t\t\t// dont't drop sps config and dont't drop key frame\n\t\t\tif ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {\n\t\t\t\tpktQue <- tmpPkt\n\t\t\t}\n\t\t\tif len(pktQue) > maxQueueNum-10 {\n\t\t\t\tlog.Debug(\"drop video pkt\")\n\t\t\t\t<-pktQue\n\t\t\t}\n\t\t}\n\n\t}\n\tlog.Debug(\"packet queue len: \", len(pktQue))\n}\n\n//\nfunc (v *VirWriter) Write(p *av.Packet) (err error) {\n\terr = nil\n\n\tif v.closed {\n\t\terr = fmt.Errorf(\"VirWriter closed\")\n\t\treturn\n\t}\n\tdefer func() {\n\t\tif e := recover(); e != nil {\n\t\t\terr = fmt.Errorf(\"VirWriter has already been closed:%v\", e)\n\t\t}\n\t}()\n\tif len(v.packetQueue) >= maxQueueNum-24 {\n\t\tv.DropPacket(v.packetQueue, v.Info())\n\t} else {\n\t\tv.packetQueue <- p\n\t}\n\n\treturn\n}\n\nfunc (v *VirWriter) SendPacket() error {\n\tFlush := reflect.ValueOf(v.conn).MethodByName(\"Flush\")\n\tvar cs core.ChunkStream\n\tfor {\n\t\tp, ok := <-v.packetQueue\n\t\tif ok {\n\t\t\tcs.Data = p.Data\n\t\t\tcs.Length = uint32(len(p.Data))\n\t\t\tcs.StreamID = p.StreamID\n\t\t\tcs.Timestamp = p.TimeStamp\n\t\t\tcs.Timestamp += v.BaseTimeStamp()\n\n\t\t\tif p.IsVideo {\n\t\t\t\tcs.TypeID = av.TAG_VIDEO\n\t\t\t} else {\n\t\t\t\tif p.IsMetadata {\n\t\t\t\t\tcs.TypeID = av.TAG_SCRIPTDATAAMF0\n\t\t\t\t} else {\n\t\t\t\t\tcs.TypeID = av.TAG_AUDIO\n\t\t\t\t}\n\t\t\t}\n\n\t\t\tv.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo)\n\t\t\tv.SetPreTime()\n\t\t\tv.RecTimeStamp(cs.Timestamp, cs.TypeID)\n\t\t\terr := v.conn.Write(cs)\n\t\t\tif err != nil {\n\t\t\t\tv.closed = true\n\t\t\t\treturn err\n\t\t\t}\n\t\t\tFlush.Call(nil)\n\t\t} else {\n\t\t\treturn fmt.Errorf(\"closed\")\n\t\t}\n\n\t}\n}\n\nfunc (v *VirWriter) Info() (ret av.Info) {\n\tret.UID = v.Uid\n\t_, _, URL := v.conn.GetInfo()\n\tret.URL = URL\n\t_url, err := url.Parse(URL)\n\tif err != nil {\n\t\tlog.Warning(err)\n\t}\n\tret.Key = strings.TrimLeft(_url.Path, \"/\")\n\tret.Inter = true\n\treturn\n}\n\nfunc (v *VirWriter) Close(err error) {\n\tlog.Warning(\"player \", v.Info(), \"closed: \"+err.Error())\n\tif !v.closed {\n\t\tclose(v.packetQueue)\n\t}\n\tv.closed = true\n\tv.conn.Close(err)\n}\n\ntype VirReader struct {\n\tUid string\n\tav.RWBaser\n\tdemuxer *flv.Demuxer\n\tconn StreamReadWriteCloser\n\tReadBWInfo StaticsBW\n}\n\nfunc NewVirReader(conn StreamReadWriteCloser) *VirReader {\n\treturn &VirReader{\n\t\tUid: uid.NewId(),\n\t\tconn: conn,\n\t\tRWBaser: av.NewRWBaser(time.Second * time.Duration(writeTimeout)),\n\t\tdemuxer: flv.NewDemuxer(),\n\t\tReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},\n\t}\n}\n\nfunc (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {\n\tnowInMS := int64(time.Now().UnixNano() / 1e6)\n\n\tv.ReadBWInfo.StreamId = streamid\n\tif isVideoFlag {\n\t\tv.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length\n\t} else {\n\t\tv.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length\n\t}\n\n\tif v.ReadBWInfo.LastTimestamp == 0 {\n\t\tv.ReadBWInfo.LastTimestamp = nowInMS\n\t} else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {\n\t\tdiffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000\n\n\t\t//log.Printf(\"now=%d, last=%d, diff=%d\", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp)\n\t\tv.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000\n\t\tv.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000\n\n\t\tv.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes\n\t\tv.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes\n\t\tv.ReadBWInfo.LastTimestamp = nowInMS\n\t}\n}\n\nfunc (v *VirReader) Read(p *av.Packet) (err error) {\n\tdefer func() {\n\t\tif r := recover(); r != nil {\n\t\t\tlog.Warning(\"rtmp read packet panic: \", r)\n\t\t}\n\t}()\n\n\tv.SetPreTime()\n\tvar cs core.ChunkStream\n\tfor {\n\t\terr = v.conn.Read(&cs)\n\t\tif err != nil {\n\t\t\treturn err\n\t\t}\n\t\tif cs.TypeID == av.TAG_AUDIO ||\n\t\t\tcs.TypeID == av.TAG_VIDEO ||\n\t\t\tcs.TypeID == av.TAG_SCRIPTDATAAMF0 ||\n\t\t\tcs.TypeID == av.TAG_SCRIPTDATAAMF3 {\n\t\t\tbreak\n\t\t}\n\t}\n\n\tp.IsAudio = cs.TypeID == av.TAG_AUDIO\n\tp.IsVideo = cs.TypeID == av.TAG_VIDEO\n\tp.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3\n\tp.StreamID = cs.StreamID\n\tp.Data = cs.Data\n\tp.TimeStamp = cs.Timestamp\n\n\tv.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo)\n\tv.demuxer.DemuxH(p)\n\treturn err\n}\n\nfunc (v *VirReader) Info() (ret av.Info) {\n\tret.UID = v.Uid\n\t_, _, URL := v.conn.GetInfo()\n\tret.URL = URL\n\t_url, err := url.Parse(URL)\n\tif err != nil {\n\t\tlog.Warning(err)\n\t}\n\tret.Key = strings.TrimLeft(_url.Path, \"/\")\n\treturn\n}\n\nfunc (v *VirReader) Close(err error) {\n\tlog.Debug(\"publisher \", v.Info(), \"closed: \"+err.Error())\n\tv.conn.Close(err)\n}","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" Play ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Video streaming , Support multiple protocols :rtmp、flv、hls, Let's see rtmp:","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"var (\n\tSTOP_CTRL = \"RTMPRELAY_STOP\"\n)\n\ntype RtmpRelay struct {\n\tPlayUrl string\n\tPublishUrl string\n\tcs_chan chan core.ChunkStream\n\tsndctrl_chan chan string\n\tconnectPlayClient *core.ConnClient\n\tconnectPublishClient *core.ConnClient\n\tstartflag bool\n}\n\nfunc NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay {\n\treturn &RtmpRelay{\n\t\tPlayUrl: *playurl,\n\t\tPublishUrl: *publishurl,\n\t\tcs_chan: make(chan core.ChunkStream, 500),\n\t\tsndctrl_chan: make(chan string),\n\t\tconnectPlayClient: nil,\n\t\tconnectPublishClient: nil,\n\t\tstartflag: false,\n\t}\n}\n\nfunc (self *RtmpRelay) rcvPlayChunkStream() {\n\tlog.Debug(\"rcvPlayRtmpMediaPacket connectClient.Read...\")\n\tfor {\n\t\tvar rc core.ChunkStream\n\n\t\tif self.startflag == false {\n\t\t\tself.connectPlayClient.Close(nil)\n\t\t\tlog.Debugf(\"rcvPlayChunkStream close: playurl=%s, publishurl=%s\", self.PlayUrl, self.PublishUrl)\n\t\t\tbreak\n\t\t}\n\t\terr := self.connectPlayClient.Read(&rc)\n\n\t\tif err != nil && err == io.EOF {\n\t\t\tbreak\n\t\t}\n\t\t//log.Debugf(\"connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v\", rc.TypeID, len(rc.Data), err)\n\t\tswitch rc.TypeID {\n\t\tcase 20, 17:\n\t\t\tr := bytes.NewReader(rc.Data)\n\t\t\tvs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)\n\n\t\t\tlog.Debugf(\"rcvPlayRtmpMediaPacket: vs=%v, err=%v\", vs, err)\n\t\tcase 18:\n\t\t\tlog.Debug(\"rcvPlayRtmpMediaPacket: metadata....\")\n\t\tcase 8, 9:\n\t\t\tself.cs_chan <- rc\n\t\t}\n\t}\n}\n\nfunc (self *RtmpRelay) sendPublishChunkStream() {\n\tfor {\n\t\tselect {\n\t\tcase rc := <-self.cs_chan:\n\t\t\t//log.Debugf(\"sendPublishChunkStream: rc.TypeID=%v length=%d\", rc.TypeID, len(rc.Data))\n\t\t\tself.connectPublishClient.Write(rc)\n\t\tcase ctrlcmd := <-self.sndctrl_chan:\n\t\t\tif ctrlcmd == STOP_CTRL {\n\t\t\t\tself.connectPublishClient.Close(nil)\n\t\t\t\tlog.Debugf(\"sendPublishChunkStream close: playurl=%s, publishurl=%s\", self.PlayUrl, self.PublishUrl)\n\t\t\t\treturn\n\t\t\t}\n\t\t}\n\t}\n}\n\nfunc (self *RtmpRelay) Start() error {\n\tif self.startflag {\n\t\treturn fmt.Errorf(\"The rtmprelay already started, playurl=%s, publishurl=%s\\n\", self.PlayUrl, self.PublishUrl)\n\t}\n\n\tself.connectPlayClient = core.NewConnClient()\n\tself.connectPublishClient = core.NewConnClient()\n\n\tlog.Debugf(\"play server addr:%v starting....\", self.PlayUrl)\n\terr := self.connectPlayClient.Start(self.PlayUrl, av.PLAY)\n\tif err != nil {\n\t\tlog.Debugf(\"connectPlayClient.Start url=%v error\", self.PlayUrl)\n\t\treturn err\n\t}\n\n\tlog.Debugf(\"publish server addr:%v starting....\", self.PublishUrl)\n\terr = self.connectPublishClient.Start(self.PublishUrl, av.PUBLISH)\n\tif err != nil {\n\t\tlog.Debugf(\"connectPublishClient.Start url=%v error\", self.PublishUrl)\n\t\tself.connectPlayClient.Close(nil)\n\t\treturn err\n\t}\n\n\tself.startflag = true\n\tgo self.rcvPlayChunkStream()\n\tgo self.sendPublishChunkStream()\n\n\treturn nil\n}\n\nfunc (self *RtmpRelay) Stop() {\n\tif !self.startflag {\n\t\tlog.Debugf(\"The rtmprelay already stoped, playurl=%s, publishurl=%s\", self.PlayUrl, self.PublishUrl)\n\t\treturn\n\t}\n\n\tself.startflag = false\n\tself.sndctrl_chan <- STOP_CTRL\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"rtmp Agreement , For flows, you can chan, So for the server side cpu Utilization can be improved very well , meanwhile , Avoided cpu I can't use it , and memory Overflow of , Eventually, the service will be crash. Let's take a look at hls agreement ,hls The agreement is also based on http Protocol is an adaptive rate transmission protocol , Let's first look at its impact on caching flush:","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"const (\n\tvideoHZ = 90000\n\taacSampleLen = 1024\n\tmaxQueueNum = 512\n\n\th264_default_hz uint64 = 90\n)\n\ntype Source struct {\n\tav.RWBaser\n\tseq int\n\tinfo av.Info\n\tbwriter *bytes.Buffer\n\tbtswriter *bytes.Buffer\n\tdemuxer *flv.Demuxer\n\tmuxer *ts.Muxer\n\tpts, dts uint64\n\tstat *status\n\talign *align\n\tcache *audioCache\n\ttsCache *TSCacheItem\n\ttsparser *parser.CodecParser\n\tclosed bool\n\tpacketQueue chan *av.Packet\n}\n\nfunc NewSource(info av.Info) *Source {\n\tinfo.Inter = true\n\ts := &Source{\n\t\tinfo: info,\n\t\talign: &align{},\n\t\tstat: newStatus(),\n\t\tRWBaser: av.NewRWBaser(time.Second * 10),\n\t\tcache: newAudioCache(),\n\t\tdemuxer: flv.NewDemuxer(),\n\t\tmuxer: ts.NewMuxer(),\n\t\ttsCache: NewTSCacheItem(info.Key),\n\t\ttsparser: parser.NewCodecParser(),\n\t\tbwriter: bytes.NewBuffer(make([]byte, 100*1024)),\n\t\tpacketQueue: make(chan *av.Packet, maxQueueNum),\n\t}\n\tgo func() {\n\t\terr := s.SendPacket()\n\t\tif err != nil {\n\t\t\tlog.Debug(\"send packet error: \", err)\n\t\t\ts.closed = true\n\t\t}\n\t}()\n\treturn s\n}\n\nfunc (source *Source) GetCacheInc() *TSCacheItem {\n\treturn source.tsCache\n}\n\nfunc (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {\n\tlog.Warningf(\"[%v] packet queue max!!!\", info)\n\tfor i := 0; i < maxQueueNum-84; i++ {\n\t\ttmpPkt, ok := <-pktQue\n\t\t// try to don't drop audio\n\t\tif ok && tmpPkt.IsAudio {\n\t\t\tif len(pktQue) > maxQueueNum-2 {\n\t\t\t\t<-pktQue\n\t\t\t} else {\n\t\t\t\tpktQue <- tmpPkt\n\t\t\t}\n\t\t}\n\n\t\tif ok && tmpPkt.IsVideo {\n\t\t\tvideoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)\n\t\t\t// dont't drop sps config and dont't drop key frame\n\t\t\tif ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {\n\t\t\t\tpktQue <- tmpPkt\n\t\t\t}\n\t\t\tif len(pktQue) > maxQueueNum-10 {\n\t\t\t\t<-pktQue\n\t\t\t}\n\t\t}\n\n\t}\n\tlog.Debug(\"packet queue len: \", len(pktQue))\n}\n\nfunc (source *Source) Write(p *av.Packet) (err error) {\n\terr = nil\n\tif source.closed {\n\t\terr = fmt.Errorf(\"hls source closed\")\n\t\treturn\n\t}\n\tsource.SetPreTime()\n\tdefer func() {\n\t\tif e := recover(); e != nil {\n\t\t\terr = fmt.Errorf(\"hls source has already been closed:%v\", e)\n\t\t}\n\t}()\n\tif len(source.packetQueue) >= maxQueueNum-24 {\n\t\tsource.DropPacket(source.packetQueue, source.info)\n\t} else {\n\t\tif !source.closed {\n\t\t\tsource.packetQueue <- p\n\t\t}\n\t}\n\treturn\n}\n\nfunc (source *Source) SendPacket() error {\n\tdefer func() {\n\t\tlog.Debugf(\"[%v] hls sender stop\", source.info)\n\t\tif r := recover(); r != nil {\n\t\t\tlog.Warning(\"hls SendPacket panic: \", r)\n\t\t}\n\t}()\n\n\tlog.Debugf(\"[%v] hls sender start\", source.info)\n\tfor {\n\t\tif source.closed {\n\t\t\treturn fmt.Errorf(\"closed\")\n\t\t}\n\n\t\tp, ok := <-source.packetQueue\n\t\tif ok {\n\t\t\tif p.IsMetadata {\n\t\t\t\tcontinue\n\t\t\t}\n\n\t\t\terr := source.demuxer.Demux(p)\n\t\t\tif err == flv.ErrAvcEndSEQ {\n\t\t\t\tlog.Warning(err)\n\t\t\t\tcontinue\n\t\t\t} else {\n\t\t\t\tif err != nil {\n\t\t\t\t\tlog.Warning(err)\n\t\t\t\t\treturn err\n\t\t\t\t}\n\t\t\t}\n\t\t\tcompositionTime, isSeq, err := source.parse(p)\n\t\t\tif err != nil {\n\t\t\t\tlog.Warning(err)\n\t\t\t}\n\t\t\tif err != nil || isSeq {\n\t\t\t\tcontinue\n\t\t\t}\n\t\t\tif source.btswriter != nil {\n\t\t\t\tsource.stat.update(p.IsVideo, p.TimeStamp)\n\t\t\t\tsource.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime))\n\t\t\t\tsource.tsMux(p)\n\t\t\t}\n\t\t} else {\n\t\t\treturn fmt.Errorf(\"closed\")\n\t\t}\n\t}\n}\n\nfunc (source *Source) Info() (ret av.Info) {\n\treturn source.info\n}\n\nfunc (source *Source) cleanup() {\n\tclose(source.packetQueue)\n\tsource.bwriter = nil\n\tsource.btswriter = nil\n\tsource.cache = nil\n\tsource.tsCache = nil\n}\n\nfunc (source *Source) Close(err error) {\n\tlog.Debug(\"hls source closed: \", source.info)\n\tif !source.closed && !configure.Config.GetBool(\"hls_keep_after_end\") {\n\t\tsource.cleanup()\n\t}\n\tsource.closed = true\n}\n\nfunc (source *Source) cut() {\n\tnewf := true\n\tif source.btswriter == nil {\n\t\tsource.btswriter = bytes.NewBuffer(nil)\n\t} else if source.btswriter != nil && source.stat.durationMs() >= duration {\n\t\tsource.flushAudio()\n\n\t\tsource.seq++\n\t\tfilename := fmt.Sprintf(\"/%s/%d.ts\", source.info.Key, time.Now().Unix())\n\t\titem := NewTSItem(filename, int(source.stat.durationMs()), source.seq, source.btswriter.Bytes())\n\t\tsource.tsCache.SetItem(filename, item)\n\n\t\tsource.btswriter.Reset()\n\t\tsource.stat.resetAndNew()\n\t} else {\n\t\tnewf = false\n\t}\n\tif newf {\n\t\tsource.btswriter.Write(source.muxer.PAT())\n\t\tsource.btswriter.Write(source.muxer.PMT(av.SOUND_AAC, true))\n\t}\n}\n\nfunc (source *Source) parse(p *av.Packet) (int32, bool, error) {\n\tvar compositionTime int32\n\tvar ah av.AudioPacketHeader\n\tvar vh av.VideoPacketHeader\n\tif p.IsVideo {\n\t\tvh = p.Header.(av.VideoPacketHeader)\n\t\tif vh.CodecID() != av.VIDEO_H264 {\n\t\t\treturn compositionTime, false, ErrNoSupportVideoCodec\n\t\t}\n\t\tcompositionTime = vh.CompositionTime()\n\t\tif vh.IsKeyFrame() && vh.IsSeq() {\n\t\t\treturn compositionTime, true, source.tsparser.Parse(p, source.bwriter)\n\t\t}\n\t} else {\n\t\tah = p.Header.(av.AudioPacketHeader)\n\t\tif ah.SoundFormat() != av.SOUND_AAC {\n\t\t\treturn compositionTime, false, ErrNoSupportAudioCodec\n\t\t}\n\t\tif ah.AACPacketType() == av.AAC_SEQHDR {\n\t\t\treturn compositionTime, true, source.tsparser.Parse(p, source.bwriter)\n\t\t}\n\t}\n\tsource.bwriter.Reset()\n\tif err := source.tsparser.Parse(p, source.bwriter); err != nil {\n\t\treturn compositionTime, false, err\n\t}\n\tp.Data = source.bwriter.Bytes()\n\n\tif p.IsVideo && vh.IsKeyFrame() {\n\t\tsource.cut()\n\t}\n\treturn compositionTime, false, nil\n}\n\nfunc (source *Source) calcPtsDts(isVideo bool, ts, compositionTs uint32) {\n\tsource.dts = uint64(ts) * h264_default_hz\n\tif isVideo {\n\t\tsource.pts = source.dts + uint64(compositionTs)*h264_default_hz\n\t} else {\n\t\tsampleRate, _ := source.tsparser.SampleRate()\n\t\tsource.align.align(&source.dts, uint32(videoHZ*aacSampleLen/sampleRate))\n\t\tsource.pts = source.dts\n\t}\n}\nfunc (source *Source) flushAudio() error {\n\treturn source.muxAudio(1)\n}\n\nfunc (source *Source) muxAudio(limit byte) error {\n\tif source.cache.CacheNum() < limit {\n\t\treturn nil\n\t}\n\tvar p av.Packet\n\t_, pts, buf := source.cache.GetFrame()\n\tp.Data = buf\n\tp.TimeStamp = uint32(pts / h264_default_hz)\n\treturn source.muxer.Mux(&p, source.btswriter)\n}\n\nfunc (source *Source) tsMux(p *av.Packet) error {\n\tif p.IsVideo {\n\t\treturn source.muxer.Mux(p, source.btswriter)\n\t} else {\n\t\tsource.cache.Cache(p.Data, source.pts)\n\t\treturn source.muxAudio(cache_max_frames)\n\t}\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" about hls Streaming itself , Or follow http agreement :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"const (\n\tduration = 3000\n)\n\nvar (\n\tErrNoPublisher = fmt.Errorf(\"no publisher\")\n\tErrInvalidReq = fmt.Errorf(\"invalid req url path\")\n\tErrNoSupportVideoCodec = fmt.Errorf(\"no support video codec\")\n\tErrNoSupportAudioCodec = fmt.Errorf(\"no support audio codec\")\n)\n\nvar crossdomainxml = []byte(`\n\n\t\n\t\n`)\n\ntype Server struct {\n\tlistener net.Listener\n\tconns *sync.Map\n}\n\nfunc NewServer() *Server {\n\tret := &Server{\n\t\tconns: &sync.Map{},\n\t}\n\tgo ret.checkStop()\n\treturn ret\n}\n\nfunc (server *Server) Serve(listener net.Listener) error {\n\tmux := http.NewServeMux()\n\tmux.HandleFunc(\"/\", func(w http.ResponseWriter, r *http.Request) {\n\t\tserver.handle(w, r)\n\t})\n\tserver.listener = listener\n\thttp.Serve(listener, mux)\n\treturn nil\n}\n\nfunc (server *Server) GetWriter(info av.Info) av.WriteCloser {\n\tvar s *Source\n\tv, ok := server.conns.Load(info.Key)\n\tif !ok {\n\t\tlog.Debug(\"new hls source\")\n\t\ts = NewSource(info)\n\t\tserver.conns.Store(info.Key, s)\n\t} else {\n\t\ts = v.(*Source)\n\t}\n\treturn s\n}\n\nfunc (server *Server) getConn(key string) *Source {\n\tv, ok := server.conns.Load(key)\n\tif !ok {\n\t\treturn nil\n\t}\n\treturn v.(*Source)\n}\n\nfunc (server *Server) checkStop() {\n\tfor {\n\t\t<-time.After(5 * time.Second)\n\n\t\tserver.conns.Range(func(key, val interface{}) bool {\n\t\t\tv := val.(*Source)\n\t\t\tif !v.Alive() && !configure.Config.GetBool(\"hls_keep_after_end\") {\n\t\t\t\tlog.Debug(\"check stop and remove: \", v.Info())\n\t\t\t\tserver.conns.Delete(key)\n\t\t\t}\n\t\t\treturn true\n\t\t})\n\t}\n}\n\nfunc (server *Server) handle(w http.ResponseWriter, r *http.Request) {\n\tif path.Base(r.URL.Path) == \"crossdomain.xml\" {\n\t\tw.Header().Set(\"Content-Type\", \"application/xml\")\n\t\tw.Write(crossdomainxml)\n\t\treturn\n\t}\n\tswitch path.Ext(r.URL.Path) {\n\tcase \".m3u8\":\n\t\tkey, _ := server.parseM3u8(r.URL.Path)\n\t\tconn := server.getConn(key)\n\t\tif conn == nil {\n\t\t\thttp.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)\n\t\t\treturn\n\t\t}\n\t\ttsCache := conn.GetCacheInc()\n\t\tif tsCache == nil {\n\t\t\thttp.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)\n\t\t\treturn\n\t\t}\n\t\tbody, err := tsCache.GenM3U8PlayList()\n\t\tif err != nil {\n\t\t\tlog.Debug(\"GenM3U8PlayList error: \", err)\n\t\t\thttp.Error(w, err.Error(), http.StatusBadRequest)\n\t\t\treturn\n\t\t}\n\n\t\tw.Header().Set(\"Access-Control-Allow-Origin\", \"*\")\n\t\tw.Header().Set(\"Cache-Control\", \"no-cache\")\n\t\tw.Header().Set(\"Content-Type\", \"application/x-mpegURL\")\n\t\tw.Header().Set(\"Content-Length\", strconv.Itoa(len(body)))\n\t\tw.Write(body)\n\tcase \".ts\":\n\t\tkey, _ := server.parseTs(r.URL.Path)\n\t\tconn := server.getConn(key)\n\t\tif conn == nil {\n\t\t\thttp.Error(w, ErrNoPublisher.Error(), http.StatusForbidden)\n\t\t\treturn\n\t\t}\n\t\ttsCache := conn.GetCacheInc()\n\t\titem, err := tsCache.GetItem(r.URL.Path)\n\t\tif err != nil {\n\t\t\tlog.Debug(\"GetItem error: \", err)\n\t\t\thttp.Error(w, err.Error(), http.StatusBadRequest)\n\t\t\treturn\n\t\t}\n\t\tw.Header().Set(\"Access-Control-Allow-Origin\", \"*\")\n\t\tw.Header().Set(\"Content-Type\", \"video/mp2ts\")\n\t\tw.Header().Set(\"Content-Length\", strconv.Itoa(len(item.Data)))\n\t\tw.Write(item.Data)\n\t}\n}\n\nfunc (server *Server) parseM3u8(pathstr string) (key string, err error) {\n\tpathstr = strings.TrimLeft(pathstr, \"/\")\n\tkey = strings.Split(pathstr, path.Ext(pathstr))[0]\n\treturn\n}\n\nfunc (server *Server) parseTs(pathstr string) (key string, err error) {\n\tpathstr = strings.TrimLeft(pathstr, \"/\")\n\tpaths := strings.SplitN(pathstr, \"/\", 3)\n\tif len(paths) != 3 {\n\t\terr = fmt.Errorf(\"invalid path=%s\", pathstr)\n\t\treturn\n\t}\n\tkey = paths[0] + \"/\" + paths[1]\n\n\treturn\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Finally, let's have a look at http-flv agreement , It's actually a combination of RTMP Low latency , And can reuse existing HTTP A streaming protocol for distributing resources .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"FLV Header and body , And then there is AudioTag、VideoTag, What it reveals is that :","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" It can avoid the interference of firewall to a certain extent ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" have access to HTTPS Make an encrypted channel ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" It's very compatible HTTP 302 Jump , Flexible scheduling ","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In the end, I can support ios、 Android ","attrs":{}}]}]}],"attrs":{}},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams {\n\trtmpStream := server.handler.(*rtmp.RtmpStream)\n\tif rtmpStream == nil {\n\t\treturn nil\n\t}\n\tmsgs := new(streams)\n\n\trtmpStream.GetStreams().Range(func(key, val interface{}) bool {\n\t\tif s, ok := val.(*rtmp.Stream); ok {\n\t\t\tif s.GetReader() != nil {\n\t\t\t\tmsg := stream{key.(string), s.GetReader().Info().UID}\n\t\t\t\tmsgs.Publishers = append(msgs.Publishers, msg)\n\t\t\t}\n\t\t}\n\t\treturn true\n\t})\n\n\trtmpStream.GetStreams().Range(func(key, val interface{}) bool {\n\t\tws := val.(*rtmp.Stream).GetWs()\n\n\t\tws.Range(func(k, v interface{}) bool {\n\t\t\tif pw, ok := v.(*rtmp.PackWriterCloser); ok {\n\t\t\t\tif pw.GetWriter() != nil {\n\t\t\t\t\tmsg := stream{key.(string), pw.GetWriter().Info().UID}\n\t\t\t\t\tmsgs.Players = append(msgs.Players, msg)\n\t\t\t\t}\n\t\t\t}\n\t\t\treturn true\n\t\t})\n\t\treturn true\n\t})\n\n\treturn msgs\n}\n\nfunc (server *Server) getStream(w http.ResponseWriter, r *http.Request) {\n\tmsgs := server.getStreams(w, r)\n\tif msgs == nil {\n\t\treturn\n\t}\n\tresp, _ := json.Marshal(msgs)\n\tw.Header().Set(\"Content-Type\", \"application/json\")\n\tw.Write(resp)\n}\n\nfunc (server *Server) handleConn(w http.ResponseWriter, r *http.Request) {\n\tdefer func() {\n\t\tif r := recover(); r != nil {\n\t\t\tlog.Error(\"http flv handleConn panic: \", r)\n\t\t}\n\t}()\n\n\turl := r.URL.String()\n\tu := r.URL.Path\n\tif pos := strings.LastIndex(u, \".\"); pos < 0 || u[pos:] != \".flv\" {\n\t\thttp.Error(w, \"invalid path\", http.StatusBadRequest)\n\t\treturn\n\t}\n\tpath := strings.TrimSuffix(strings.TrimLeft(u, \"/\"), \".flv\")\n\tpaths := strings.SplitN(path, \"/\", 2)\n\tlog.Debug(\"url:\", u, \"path:\", path, \"paths:\", paths)\n\n\tif len(paths) != 2 {\n\t\thttp.Error(w, \"invalid path\", http.StatusBadRequest)\n\t\treturn\n\t}\n\n\t// Determine whether the stream is published , If not released , Go straight back to 404\n\tmsgs := server.getStreams(w, r)\n\tif msgs == nil || len(msgs.Publishers) == 0 {\n\t\thttp.Error(w, \"invalid path\", http.StatusNotFound)\n\t\treturn\n\t} else {\n\t\tinclude := false\n\t\tfor _, item := range msgs.Publishers {\n\t\t\tif item.Key == path {\n\t\t\t\tinclude = true\n\t\t\t\tbreak\n\t\t\t}\n\t\t}\n\t\tif include == false {\n\t\t\thttp.Error(w, \"invalid path\", http.StatusNotFound)\n\t\t\treturn\n\t\t}\n\t}\n\n\tw.Header().Set(\"Access-Control-Allow-Origin\", \"*\")\n\twriter := NewFLVWriter(paths[0], paths[1], url, w)\n\n\tserver.handler.HandleWriter(writer)\n\twriter.Wait()\n}","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":" Streaming video container format ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" So called container , That is, the multimedia content generated by the encoder ( video , Audio , subtitle , Chapter information, etc ) Standards for mixed packaging . Containers make it easy to play different multimedia content simultaneously .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Video format is a kind of identification symbol given by video playback software in order to play video files . Jian Zhi , The video format defines the communication protocol with the player .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" For container formats , We support flv、ts Format ,flv Format is a common format , such as : Iqiyi 、 Short video, etc . Of course ,flv The format is relatively simple , It's mainly made up of two parts :FLV header、FLV body. therefore header、body All need to be done parse:","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"const (\n\theaderLen = 11\n)\n\ntype FLVWriter struct {\n\tUid string\n\tav.RWBaser\n\tapp, title, url string\n\tbuf []byte\n\tclosed chan struct{}\n\tctx *os.File\n\tclosedWriter bool\n}\n\nfunc NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter {\n\tret := &FLVWriter{\n\t\tUid: uid.NewId(),\n\t\tapp: app,\n\t\ttitle: title,\n\t\turl: url,\n\t\tctx: ctx,\n\t\tRWBaser: av.NewRWBaser(time.Second * 10),\n\t\tclosed: make(chan struct{}),\n\t\tbuf: make([]byte, headerLen),\n\t}\n\n\tret.ctx.Write(flvHeader)\n\tpio.PutI32BE(ret.buf[:4], 0)\n\tret.ctx.Write(ret.buf[:4])\n\n\treturn ret\n}\n\nfunc (writer *FLVWriter) Write(p *av.Packet) error {\n\twriter.RWBaser.SetPreTime()\n\th := writer.buf[:headerLen]\n\ttypeID := av.TAG_VIDEO\n\tif !p.IsVideo {\n\t\tif p.IsMetadata {\n\t\t\tvar err error\n\t\t\ttypeID = av.TAG_SCRIPTDATAAMF0\n\t\t\tp.Data, err = amf.MetaDataReform(p.Data, amf.DEL)\n\t\t\tif err != nil {\n\t\t\t\treturn err\n\t\t\t}\n\t\t} else {\n\t\t\ttypeID = av.TAG_AUDIO\n\t\t}\n\t}\n\tdataLen := len(p.Data)\n\ttimestamp := p.TimeStamp\n\ttimestamp += writer.BaseTimeStamp()\n\twriter.RWBaser.RecTimeStamp(timestamp, uint32(typeID))\n\n\tpreDataLen := dataLen + headerLen\n\ttimestampbase := timestamp & 0xffffff\n\ttimestampExt := timestamp >> 24 & 0xff\n\n\tpio.PutU8(h[0:1], uint8(typeID))\n\tpio.PutI24BE(h[1:4], int32(dataLen))\n\tpio.PutI24BE(h[4:7], int32(timestampbase))\n\tpio.PutU8(h[7:8], uint8(timestampExt))\n\n\tif _, err := writer.ctx.Write(h); err != nil {\n\t\treturn err\n\t}\n\n\tif _, err := writer.ctx.Write(p.Data); err != nil {\n\t\treturn err\n\t}\n\n\tpio.PutI32BE(h[:4], int32(preDataLen))\n\tif _, err := writer.ctx.Write(h[:4]); err != nil {\n\t\treturn err\n\t}\n\n\treturn nil\n}\n\nfunc (writer *FLVWriter) Wait() {\n\tselect {\n\tcase <-writer.closed:\n\t\treturn\n\t}\n}\n\nfunc (writer *FLVWriter) Close(error) {\n\tif writer.closedWriter {\n\t\treturn\n\t}\n\twriter.closedWriter = true\n\twriter.ctx.Close()\n\tclose(writer.closed)\n}\n\nfunc (writer *FLVWriter) Info() (ret av.Info) {\n\tret.UID = writer.Uid\n\tret.URL = writer.url\n\tret.Key = writer.app + \"/\" + writer.title\n\treturn\n}\n\ntype FlvDvr struct{}\n\nfunc (f *FlvDvr) GetWriter(info av.Info) av.WriteCloser {\n\tpaths := strings.SplitN(info.Key, \"/\", 2)\n\tif len(paths) != 2 {\n\t\tlog.Warning(\"invalid info\")\n\t\treturn nil\n\t}\n\n\tflvDir := configure.Config.GetString(\"flv_dir\")\n\n\terr := os.MkdirAll(path.Join(flvDir, paths[0]), 0755)\n\tif err != nil {\n\t\tlog.Error(\"mkdir error: \", err)\n\t\treturn nil\n\t}\n\n\tfileName := fmt.Sprintf(\"%s_%d.%s\", path.Join(flvDir, info.Key), time.Now().Unix(), \"flv\")\n\tlog.Debug(\"flv dvr save stream to: \", fileName)\n\tw, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0755)\n\tif err != nil {\n\t\tlog.Error(\"open file error: \", err)\n\t\treturn nil\n\t}\n\n\twriter := NewFLVWriter(paths[0], paths[1], info.URL, w)\n\tlog.Debug(\"new flv dvr: \", writer.Info())\n\treturn writer\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Every FLV tag Including audio 、 video 、 Script 、 Optional encrypted metadata and load information , In fact, it means tag Three types of : Audio stream 、 Video streaming 、 Script flow , So for it parse, Let's see :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"type Tag struct {\n\tflvt flvTag\n\tmediat mediaTag\n}\n\nfunc (tag *Tag) SoundFormat() uint8 {\n\treturn tag.mediat.soundFormat\n}\n\nfunc (tag *Tag) AACPacketType() uint8 {\n\treturn tag.mediat.aacPacketType\n}\n\nfunc (tag *Tag) IsKeyFrame() bool {\n\treturn tag.mediat.frameType == av.FRAME_KEY\n}\n\nfunc (tag *Tag) IsSeq() bool {\n\treturn tag.mediat.frameType == av.FRAME_KEY &&\n\t\ttag.mediat.avcPacketType == av.AVC_SEQHDR\n}\n\nfunc (tag *Tag) CodecID() uint8 {\n\treturn tag.mediat.codecID\n}\n\nfunc (tag *Tag) CompositionTime() int32 {\n\treturn tag.mediat.compositionTime\n}\n\n// ParseMediaTagHeader, parse video, audio, tag header\nfunc (tag *Tag) ParseMediaTagHeader(b []byte, isVideo bool) (n int, err error) {\n\tswitch isVideo {\n\tcase false:\n\t\tn, err = tag.parseAudioHeader(b)\n\tcase true:\n\t\tn, err = tag.parseVideoHeader(b)\n\t}\n\treturn\n}\n\nfunc (tag *Tag) parseAudioHeader(b []byte) (n int, err error) {\n\tif len(b) < n+1 {\n\t\terr = fmt.Errorf(\"invalid audiodata len=%d\", len(b))\n\t\treturn\n\t}\n\tflags := b[0]\n\ttag.mediat.soundFormat = flags >> 4\n\ttag.mediat.soundRate = (flags >> 2) & 0x3\n\ttag.mediat.soundSize = (flags >> 1) & 0x1\n\ttag.mediat.soundType = flags & 0x1\n\tn++\n\tswitch tag.mediat.soundFormat {\n\tcase av.SOUND_AAC:\n\t\ttag.mediat.aacPacketType = b[1]\n\t\tn++\n\t}\n\treturn\n}\n\nfunc (tag *Tag) parseVideoHeader(b []byte) (n int, err error) {\n\tif len(b) < n+5 {\n\t\terr = fmt.Errorf(\"invalid videodata len=%d\", len(b))\n\t\treturn\n\t}\n\tflags := b[0]\n\ttag.mediat.frameType = flags >> 4\n\ttag.mediat.codecID = flags & 0xf\n\tn++\n\tif tag.mediat.frameType == av.FRAME_INTER || tag.mediat.frameType == av.FRAME_KEY {\n\t\ttag.mediat.avcPacketType = b[1]\n\t\tfor i := 2; i < 5; i++ {\n\t\t\ttag.mediat.compositionTime = tag.mediat.compositionTime<<8 + int32(b[i])\n\t\t}\n\t\tn += 4\n\t}\n\treturn\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" You know, in data transmission 、 Data storage , In order to ensure the correctness of the data , It needs to be handled by means of error detection ,crc Among many error detection methods, it is the most famous one , Its error detection ability is very strong , Low overhead , It is easy to realize with encoder and detection circuit . So here's what it's about ts Multiple modulation of the format , Adopted crc32 Method verification :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"func (muxer *Muxer) Mux(p *av.Packet, w io.Writer) error {\n\tfirst := true\n\twBytes := 0\n\tpesIndex := 0\n\ttmpLen := byte(0)\n\tdataLen := byte(0)\n\n\tvar pes pesHeader\n\tdts := int64(p.TimeStamp) * int64(h264DefaultHZ)\n\tpts := dts\n\tpid := audioPID\n\tvar videoH av.VideoPacketHeader\n\tif p.IsVideo {\n\t\tpid = videoPID\n\t\tvideoH, _ = p.Header.(av.VideoPacketHeader)\n\t\tpts = dts + int64(videoH.CompositionTime())*int64(h264DefaultHZ)\n\t}\n\terr := pes.packet(p, pts, dts)\n\tif err != nil {\n\t\treturn err\n\t}\n\tpesHeaderLen := pes.len\n\tpacketBytesLen := len(p.Data) + int(pesHeaderLen)\n\n\tfor {\n\t\tif packetBytesLen <= 0 {\n\t\t\tbreak\n\t\t}\n\t\tif p.IsVideo {\n\t\t\tmuxer.videoCc++\n\t\t\tif muxer.videoCc > 0xf {\n\t\t\t\tmuxer.videoCc = 0\n\t\t\t}\n\t\t} else {\n\t\t\tmuxer.audioCc++\n\t\t\tif muxer.audioCc > 0xf {\n\t\t\t\tmuxer.audioCc = 0\n\t\t\t}\n\t\t}\n\n\t\ti := byte(0)\n\n\t\t//sync byte\n\t\tmuxer.tsPacket[i] = 0x47\n\t\ti++\n\n\t\t//error indicator, unit start indicator,ts priority,pid\n\t\tmuxer.tsPacket[i] = byte(pid >> 8) //pid high 5 bits\n\t\tif first {\n\t\t\tmuxer.tsPacket[i] = muxer.tsPacket[i] | 0x40 //unit start indicator\n\t\t}\n\t\ti++\n\n\t\t//pid low 8 bits\n\t\tmuxer.tsPacket[i] = byte(pid)\n\t\ti++\n\n\t\t//scram control, adaptation control, counter\n\t\tif p.IsVideo {\n\t\t\tmuxer.tsPacket[i] = 0x10 | byte(muxer.videoCc&0x0f)\n\t\t} else {\n\t\t\tmuxer.tsPacket[i] = 0x10 | byte(muxer.audioCc&0x0f)\n\t\t}\n\t\ti++\n\n\t\t// Keyframes need to be added pcr\n\t\tif first && p.IsVideo && videoH.IsKeyFrame() {\n\t\t\tmuxer.tsPacket[3] |= 0x20\n\t\t\tmuxer.tsPacket[i] = 7\n\t\t\ti++\n\t\t\tmuxer.tsPacket[i] = 0x50\n\t\t\ti++\n\t\t\tmuxer.writePcr(muxer.tsPacket[0:], i, dts)\n\t\t\ti += 6\n\t\t}\n\n\t\t//frame data\n\t\tif packetBytesLen >= tsDefaultDataLen {\n\t\t\tdataLen = tsDefaultDataLen\n\t\t\tif first {\n\t\t\t\tdataLen -= (i - 4)\n\t\t\t}\n\t\t} else {\n\t\t\tmuxer.tsPacket[3] |= 0x20 //have adaptation\n\t\t\tremainBytes := byte(0)\n\t\t\tdataLen = byte(packetBytesLen)\n\t\t\tif first {\n\t\t\t\tremainBytes = tsDefaultDataLen - dataLen - (i - 4)\n\t\t\t} else {\n\t\t\t\tremainBytes = tsDefaultDataLen - dataLen\n\t\t\t}\n\t\t\tmuxer.adaptationBufInit(muxer.tsPacket[i:], byte(remainBytes))\n\t\t\ti += remainBytes\n\t\t}\n\t\tif first && i < tsPacketLen && pesHeaderLen > 0 {\n\t\t\ttmpLen = tsPacketLen - i\n\t\t\tif pesHeaderLen <= tmpLen {\n\t\t\t\ttmpLen = pesHeaderLen\n\t\t\t}\n\t\t\tcopy(muxer.tsPacket[i:], pes.data[pesIndex:pesIndex+int(tmpLen)])\n\t\t\ti += tmpLen\n\t\t\tpacketBytesLen -= int(tmpLen)\n\t\t\tdataLen -= tmpLen\n\t\t\tpesHeaderLen -= tmpLen\n\t\t\tpesIndex += int(tmpLen)\n\t\t}\n\n\t\tif i < tsPacketLen {\n\t\t\ttmpLen = tsPacketLen - i\n\t\t\tif tmpLen <= dataLen {\n\t\t\t\tdataLen = tmpLen\n\t\t\t}\n\t\t\tcopy(muxer.tsPacket[i:], p.Data[wBytes:wBytes+int(dataLen)])\n\t\t\twBytes += int(dataLen)\n\t\t\tpacketBytesLen -= int(dataLen)\n\t\t}\n\t\tif w != nil {\n\t\t\tif _, err := w.Write(muxer.tsPacket[0:]); err != nil {\n\t\t\t\treturn err\n\t\t\t}\n\t\t}\n\t\tfirst = false\n\t}\n\n\treturn nil\n}\n\n//PAT return pat data\nfunc (muxer *Muxer) PAT() []byte {\n\ti := 0\n\tremainByte := 0\n\ttsHeader := []byte{0x47, 0x40, 0x00, 0x10, 0x00}\n\tpatHeader := []byte{0x00, 0xb0, 0x0d, 0x00, 0x01, 0xc1, 0x00, 0x00, 0x00, 0x01, 0xf0, 0x01}\n\n\tif muxer.patCc > 0xf {\n\t\tmuxer.patCc = 0\n\t}\n\ttsHeader[3] |= muxer.patCc & 0x0f\n\tmuxer.patCc++\n\n\tcopy(muxer.pat[i:], tsHeader)\n\ti += len(tsHeader)\n\n\tcopy(muxer.pat[i:], patHeader)\n\ti += len(patHeader)\n\n\tcrc32Value := GenCrc32(patHeader)\n\tmuxer.pat[i] = byte(crc32Value >> 24)\n\ti++\n\tmuxer.pat[i] = byte(crc32Value >> 16)\n\ti++\n\tmuxer.pat[i] = byte(crc32Value >> 8)\n\ti++\n\tmuxer.pat[i] = byte(crc32Value)\n\ti++\n\n\tremainByte = int(tsPacketLen - i)\n\tfor j := 0; j < remainByte; j++ {\n\t\tmuxer.pat[i+j] = 0xff\n\t}\n\n\treturn muxer.pat[0:]\n}\n\n// PMT return pmt data\nfunc (muxer *Muxer) PMT(soundFormat byte, hasVideo bool) []byte {\n\ti := int(0)\n\tj := int(0)\n\tvar progInfo []byte\n\tremainBytes := int(0)\n\ttsHeader := []byte{0x47, 0x50, 0x01, 0x10, 0x00}\n\tpmtHeader := []byte{0x02, 0xb0, 0xff, 0x00, 0x01, 0xc1, 0x00, 0x00, 0xe1, 0x00, 0xf0, 0x00}\n\tif !hasVideo {\n\t\tpmtHeader[9] = 0x01\n\t\tprogInfo = []byte{0x0f, 0xe1, 0x01, 0xf0, 0x00}\n\t} else {\n\t\tprogInfo = []byte{0x1b, 0xe1, 0x00, 0xf0, 0x00, //h264 or h265*\n\t\t\t0x0f, 0xe1, 0x01, 0xf0, 0x00, //mp3 or aac\n\t\t}\n\t}\n\tpmtHeader[2] = byte(len(progInfo) + 9 + 4)\n\n\tif muxer.pmtCc > 0xf {\n\t\tmuxer.pmtCc = 0\n\t}\n\ttsHeader[3] |= muxer.pmtCc & 0x0f\n\tmuxer.pmtCc++\n\n\tif soundFormat == 2 ||\n\t\tsoundFormat == 14 {\n\t\tif hasVideo {\n\t\t\tprogInfo[5] = 0x4\n\t\t} else {\n\t\t\tprogInfo[0] = 0x4\n\t\t}\n\t}\n\n\tcopy(muxer.pmt[i:], tsHeader)\n\ti += len(tsHeader)\n\n\tcopy(muxer.pmt[i:], pmtHeader)\n\ti += len(pmtHeader)\n\n\tcopy(muxer.pmt[i:], progInfo[0:])\n\ti += len(progInfo)\n\n\tcrc32Value := GenCrc32(muxer.pmt[5 : 5+len(pmtHeader)+len(progInfo)])\n\tmuxer.pmt[i] = byte(crc32Value >> 24)\n\ti++\n\tmuxer.pmt[i] = byte(crc32Value >> 16)\n\ti++\n\tmuxer.pmt[i] = byte(crc32Value >> 8)\n\ti++\n\tmuxer.pmt[i] = byte(crc32Value)\n\ti++\n\n\tremainBytes = int(tsPacketLen - i)\n\tfor j = 0; j < remainBytes; j++ {\n\t\tmuxer.pmt[i+j] = 0xff\n\t}\n\n\treturn muxer.pmt[0:]\n}\n\nfunc (muxer *Muxer) adaptationBufInit(src []byte, remainBytes byte) {\n\tsrc[0] = byte(remainBytes - 1)\n\tif remainBytes == 1 {\n\t} else {\n\t\tsrc[1] = 0x00\n\t\tfor i := 2; i < len(src); i++ {\n\t\t\tsrc[i] = 0xff\n\t\t}\n\t}\n\treturn\n}\n\nfunc (muxer *Muxer) writePcr(b []byte, i byte, pcr int64) error {\n\tb[i] = byte(pcr >> 25)\n\ti++\n\tb[i] = byte((pcr >> 17) & 0xff)\n\ti++\n\tb[i] = byte((pcr >> 9) & 0xff)\n\ti++\n\tb[i] = byte((pcr >> 1) & 0xff)\n\ti++\n\tb[i] = byte(((pcr & 0x1) << 7) | 0x7e)\n\ti++\n\tb[i] = 0x00\n\n\treturn nil\n}\n\ntype pesHeader struct {\n\tlen byte\n\tdata [tsPacketLen]byte\n}\n\n//pesPacket return pes packet\nfunc (header *pesHeader) packet(p *av.Packet, pts, dts int64) error {\n\t//PES header\n\ti := 0\n\theader.data[i] = 0x00\n\ti++\n\theader.data[i] = 0x00\n\ti++\n\theader.data[i] = 0x01\n\ti++\n\n\tsid := audioSID\n\tif p.IsVideo {\n\t\tsid = videoSID\n\t}\n\theader.data[i] = byte(sid)\n\ti++\n\n\tflag := 0x80\n\tptslen := 5\n\tdtslen := ptslen\n\theaderSize := ptslen\n\tif p.IsVideo && pts != dts {\n\t\tflag |= 0x40\n\t\theaderSize += 5 //add dts\n\t}\n\tsize := len(p.Data) + headerSize + 3\n\tif size > 0xffff {\n\t\tsize = 0\n\t}\n\theader.data[i] = byte(size >> 8)\n\ti++\n\theader.data[i] = byte(size)\n\ti++\n\n\theader.data[i] = 0x80\n\ti++\n\theader.data[i] = byte(flag)\n\ti++\n\theader.data[i] = byte(headerSize)\n\ti++\n\n\theader.writeTs(header.data[0:], i, flag>>6, pts)\n\ti += ptslen\n\tif p.IsVideo && pts != dts {\n\t\theader.writeTs(header.data[0:], i, 1, dts)\n\t\ti += dtslen\n\t}\n\n\theader.len = byte(i)\n\n\treturn nil\n}\n\nfunc (header *pesHeader) writeTs(src []byte, i int, fb int, ts int64) {\n\tval := uint32(0)\n\tif ts > 0x1ffffffff {\n\t\tts -= 0x1ffffffff\n\t}\n\tval = uint32(fb<<4) | ((uint32(ts>>30) & 0x07) << 1) | 1\n\tsrc[i] = byte(val)\n\ti++\n\n\tval = ((uint32(ts>>15) & 0x7fff) << 1) | 1\n\tsrc[i] = byte(val >> 8)\n\ti++\n\tsrc[i] = byte(val)\n\ti++\n\n\tval = (uint32(ts&0x7fff) << 1) | 1\n\tsrc[i] = byte(val >> 8)\n\ti++\n\tsrc[i] = byte(val)\n}","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"scheduler","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"scheduler It's mainly to schedule tasks , So what are the main tasks ? Mainly the ordinary ones api Tasks that cannot give immediate results . such as : Our video website needs some video auditing 、 The need for data recovery . Now , We need to do something short delay, Users can't see , But backstage still exists . This requires scheduler Asynchronous processing . For example, some periodic tasks . stay Scheduler in , There is still Timer, Timer is mainly used for timing processing task Of .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" therefore , Our architecture diagram :","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/ea/ea2504ab42a76f615c4fbe151aab2776.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In this section , We use runner The production of 、 Consumer model implementation . The specific code is as follows :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"package taskrunner\n\nimport (\n)\n\ntype Runner struct {\n Controller controlChan\n Error controlChan\n Data dataChan\n dataSize int\n longLived bool\n Dispatcher fn \n Executor fn\n}\n\nfunc NewRunner(size int, longlived bool, d fn, e fn) *Runner {\n return &Runner {\n  Controller: make(chan string, 1),\n  Error: make(chan string, 1),\n  Data: make(chan interface{}, size),\n  longLived: longlived,\n  dataSize: size,\n  Dispatcher: d,\n  Executor: e,\n }\n}\n\nfunc (r *Runner) startDispatch() {\n defer func() {\n  if !r.longLived {\n   close(r.Controller)\n   close(r.Data)\n   close(r.Error)\n  }\n }()\n\n for {\n  select {\n  case c :=<- r.Controller:\n   if c == READY_TO_DISPATCH {\n    err := r.Dispatcher(r.Data)\n    if err != nil {\n     r.Error <- CLOSE\n    } else {\n     r.Controller <- READY_TO_EXECUTE\n    }\n   }\n\n   if c == READY_TO_EXECUTE {\n    err := r.Executor(r.Data)\n    if err != nil {\n     r.Error <- CLOSE\n    } else {\n     r.Controller <- READY_TO_DISPATCH\n    }\n   }\n  case e :=<- r.Error:\n   if e == CLOSE {\n    return\n   }\n  default:\n\n  }\n }\n}\n\nfunc (r *Runner) StartAll() {\n r.Controller <- READY_TO_DISPATCH\n r.startDispatch()\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Runner It can be reused , And what's next is Task It's custom Runner Of . such as : We delayed deleting the video .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Let's get the data first , have a look :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"package dbops\n\nimport (\n \"log\"\n _ \"github.com/go-sql-driver/mysql\"\n)\n\nfunc ReadVideoDeletionRecord(count int) ([]string, error) {\n stmtOut, err := dbConn.Prepare(\"SELECT video_id FROM video_del_rec LIMIT ?\")\n\n var ids []string\n\n if err != nil {\n  return ids, err\n }\n\n rows, err := stmtOut.Query(count)\n if err != nil {\n  log.Printf(\"Query VideoDeletionRecord error: %v\", err)\n  return ids, err\n }\n\n for rows.Next() {\n  var id string\n  if err := rows.Scan(&id); err != nil {\n   return ids, err\n  }\n\n  ids = append(ids, id)\n }\n\n defer stmtOut.Close()\n return ids, nil\n}\n\nfunc DelVideoDeletionRecord(vid string) error {\n stmtDel, err := dbConn.Prepare(\"DELETE FROM video_del_rec WHERE video_id=?\")\n if err != nil {\n  return err\n }\n\n _, err = stmtDel.Exec(vid)\n if err != nil {\n  log.Printf(\"Deleting VideoDeletionRecord error: %v\", err)\n  return err\n }\n\n defer stmtDel.Close()\n return nil\n}","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" After getting the data , Need to deal with , It's time to task:","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"package taskrunner\n\nimport (\n \"os\"\n \"errors\"\n \"log\"\n \"sync\"\n \"github.com/avenssi/video_server/scheduler/dbops\"\n)\n\nfunc deleteVideo(vid string) error {\n err := os.Remove(VIDEO_PATH + vid)\n\n if err != nil && !os.IsNotExist(err) {\n  log.Printf(\"Deleting video error: %v\", err)\n  return err\n }\n\n return nil\n}\n\nfunc VideoClearDispatcher(dc dataChan) error {\n res, err := dbops.ReadVideoDeletionRecord(3)\n if err != nil {\n  log.Printf(\"Video clear dispatcher error: %v\", err)\n  return err\n }\n\n if len(res) == 0 {\n  return errors.New(\"All tasks finished\")\n }\n\n for _, id := range res {\n  dc <- id\n }\n\n return nil\n}\n\nfunc VideoClearExecutor(dc dataChan) error {\n errMap := &sync.Map{}\n var err error\n\n forloop:\n  for {\n   select {\n   case vid :=<- dc:\n    go func(id interface{}) {\n     if err := deleteVideo(id.(string)); err != nil {\n      errMap.Store(id, err)\n      return\n     }\n     if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {\n      errMap.Store(id, err)\n      return \n     }\n    }(vid)\n   default:\n    break forloop\n   }\n  }\n\n errMap.Range(func(k, v interface{}) bool {\n  err = v.(error)\n  if err != nil {\n   return false\n  }\n  return true\n })\n\n return err\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" That's about asynchrony 、 The process of processing video stream information regularly .","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":4},"content":[{"type":"text","text":"stream server","attrs":{}}]},{"type":"bulletedlist","content":[{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"Streaming","attrs":{}}]}]},{"type":"listitem","attrs":{"listStyle":null},"content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong","attrs":{}}],"text":"Upload files","attrs":{}}]}]}],"attrs":{}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Streaming It's mainly different from ordinary Links , It needs to keep long links , It's not the same as short links , When sending a request To come over , Will continue to output data stream with the client , And it's going to be long . So when multiple long links are maintained at the same time , There's a problem , If you keep linking 、 Open the web page , In the end, we will give our services to crash fall , therefore , We need to do flow control :limit, The flow control here may only be in connect When to limit .","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"package main \n\nimport (\n \"log\"\n)\n\ntype ConnLimiter struct {\n concurrentConn int\n bucket chan int\n}\n\nfunc NewConnLimiter(cc int) *ConnLimiter {\n return &ConnLimiter {\n  concurrentConn: cc,\n  bucket: make(chan int, cc),\n }\n}\n\nfunc (cl *ConnLimiter) GetConn() bool {\n if len(cl.bucket) >= cl.concurrentConn {\n  log.Printf(\"Reached the rate limitation.\")\n  return false\n }\n\n cl.bucket <- 1\n return true\n}\n\nfunc (cl *ConnLimiter) ReleaseConn() {\n c :=<- cl.bucket\n log.Printf(\"New connction coming: %d\", c)\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" With flow control , We need to be in http middleware Embedded in the flow control , Again , When we start up , All need to be registered router as well as http server, So the code is as follows :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"package main \n\nimport (\n \"net/http\"\n \"github.com/julienschmidt/httprouter\"\n)\n\ntype middleWareHandler struct {\n r *httprouter.Router\n l *ConnLimiter\n}\n\nfunc NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {\n m := middleWareHandler{}\n m.r = r\n m.l = NewConnLimiter(cc)\n return m\n}\n\nfunc RegisterHandlers() *httprouter.Router {\n router := httprouter.New()\n\n router.GET(\"/videos/:vid-id\", streamHandler)\n\n router.POST(\"/upload/:vid-id\", uploadHandler)\n\n router.GET(\"/testpage\", testPageHandler)\n\n return router\n}\n\nfunc (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {\n if !m.l.GetConn() {\n  sendErrorResponse(w, http.StatusTooManyRequests, \"Too many requests\")\n  return\n }\n\n m.r.ServeHTTP(w, r)\n defer m.l.ReleaseConn()\n}\n\nfunc main() {\n r := RegisterHandlers()\n mh := NewMiddleWareHandler(r, 2)\n http.ListenAndServe(\":2000\", mh)\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Last , Let's see streamHandler How to deal with :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {\n vid := p.ByName(\"vid-id\")\n vl := VIDEO_DIR + vid\n\n video, err := os.Open(vl)\n if err != nil {\n  log.Printf(\"Error when try to open file: %v\", err)\n  sendErrorResponse(w, http.StatusInternalServerError, \"Internal Error\")\n  return\n }\n\n w.Header().Set(\"Content-Type\", \"video/mp4\")\n http.ServeContent(w, r, \"\", time.Now(), video)\n\n defer video.Close()\n}\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" We adopt a more general approach here : After getting the unique information of the stream , Deal directly with .Upload files when , We need to do a static check , And then read the data from it :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {\n r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)\n if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {\n  sendErrorResponse(w, http.StatusBadRequest, \"File is too big\")\n  return \n }\n\n file, _, err := r.FormFile(\"file\")\n if err != nil {\n  log.Printf(\"Error when try to get file: %v\", err)\n  sendErrorResponse(w, http.StatusInternalServerError, \"Internal Error\")\n  return \n }\n\n data, err := ioutil.ReadAll(file)\n if err != nil {\n  log.Printf(\"Read file error: %v\", err)\n  sendErrorResponse(w, http.StatusInternalServerError, \"Internal Error\")\n }\n\n fn := p.ByName(\"vid-id\")\n err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)\n if err != nil {\n  log.Printf(\"Write file error: %v\", err)\n  sendErrorResponse(w, http.StatusInternalServerError, \"Internal Error\")\n  return\n }\n\n w.WriteHeader(http.StatusCreated)\n io.WriteString(w, \"Uploaded successfully\")\n}\n","attrs":{}}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":" Website deployment ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Cloud nativity is an abstract and concrete existence . It's not a specific product , It's a set of technology system and methodology , With the further development of various open source technologies around cloud native architecture , Cloud native technology system will become the mainstream , And it affects every technician 、 Every enterprise . So we follow the trend , Into the age of cloud Nativity , Today, we mainly use k8s To deploy our website services .","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" I want to compile and package the previous code first :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"FROM ubuntu:16.04 as build\n\nENV TZ=Asia/Shanghai\nRUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone\n\nRUN apt-get update && apt-get install -y --no-install-recommends \\\n        g++ \\\n        ca-certificates \\\n        wget && \\\n    rm -rf /var/lib/apt/lists/*\n\nENV GOLANG_VERSION 1.15.1\nRUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \\\n     | tar -C /usr/local -xz\n\n\nENV GOPROXY=https://goproxy.cn,direct\nENV GO111MODULE=on\nENV GOPATH /go\nENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH\n\nWORKDIR /go/src\nCOPY . .\nWORKDIR /go/src/video-service\nRUN  sed -i \"/runmode/crunmode=pro\" /go/src/video-service/conf/app.conf\nRUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \\\n    go install -ldflags=\"-s -w\" -v /go/src/video-service\n\nFROM ubuntu:16.04\nWORKDIR /video-service\n\nRUN mkdir -p log\nCOPY --from=build /go/bin/video-service /video-service\nCMD [\"./video-service\"]\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"k8s The script for deploying the service is simple , Through simple yml or json Format data call k8s Of itself api service , Can finish k8s Deployment of services . Next , Add deployment scripts :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"---\napiVersion: apps/v1\nkind: DaemonSet\nmetadata:\n  labels:\n    app: video-service\n  name: video-service\n  namespace: system-server\nspec:\n  replicas: 1\n  selector:\n    matchLabels:\n      app: video-service\n  template:\n    metadata:\n      labels:\n        app: video-service\n    spec:\n      containers:\n        - image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service\n          imagePullPolicy: Always\n          name: video-service\n          ports:\n            - containerPort: 1000\n          #livenessProbe:\n            #httpGet:\n              #path: /api/v1/healthz\n              #port: 1000\n              #scheme: HTTP\n            #initialDelaySeconds: 15\n            #periodSeconds: 10\n            #timeoutSeconds: 3\n            #failureThreshold: 5\n          volumeMounts:\n            - name: video-service-config\n              mountPath: /video-service/conf\n      volumes:\n        - name: video-service-config\n          configMap:\n            name: video-service-config\n      nodeSelector:\n        video-service: \"true\"\n      restartPolicy: Always\n","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" After compiling and packaging , Deployment scripts :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"sh build/build.sh\nkubectl create -f deploy.yml","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Use here K8s Deploy ,k8s The main advantage of this is , After executing the command , Will automatically create the underlying container for us pod, And manage these containers , Let's take a look at the service startup :","attrs":{}}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"tess@cm001:~$ kubectl describe po video-service-646ccc4c5c-qrpgp -n system-server \nName: video-service-646ccc4c5c-qrpgp\nNamespace: system-server\nPriority: 0\nNode: cm001/10.11.32.21\nStart Time: Wed, 21 Apr 2021 15:38:48 +0800\nLabels: app=video-service\n pod-template-hash=646ccc4c5c\nAnnotations: cni.projectcalico.org/podIP: 20.247.87.138/32\nStatus: Running\nIP: 20.247.87.138\nControlled By: ReplicaSet/video-service-646ccc4c5c\nContainers:\n video-service:\n Container ID: docker://f284ceac649f4e1a29ac77cdd425ccc852caf67cc9c133144a7d1c8747e32aea\n Image: minicub/video-service\n Image ID: docker-pullable://minicub/video-service@sha256:3036927b55c6be9efc4cf34d6ad04aba093e04b49807c18ce0f7a418b2577bf4\n Port: 1000/TCP\n Host Port: 0/TCP\n State: Running\n Started: Wed, 28 Apr 2021 16:01:15 +0800\n Ready: True\n Restart Count: 1\n Environment: \n Mounts:\n /video-service/conf from video-service-config (rw)\n /var/run/secrets/kubernetes.io/serviceaccount from default-token-62wgr (ro)\nConditions:\n Type Status\n Initialized True \n Ready True \n ContainersReady True \n PodScheduled True \nVolumes:\n video-service-config:\n Type: ConfigMap (a volume populated by a ConfigMap)\n Name: video-service-config\n Optional: false\n default-token-62wgr:\n Type: Secret (a volume populated by a Secret)\n SecretName: default-token-62wgr\n Optional: false\nQoS Class: BestEffort\nNode-Selectors: video-service=true\nTolerations: node.kubernetes.io/not-ready:NoExecute for 300s\n node.kubernetes.io/unreachable:NoExecute for 300s\nEvents: ","attrs":{}}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Use here K8s Deploy to the machine . Service access address after deployment :","attrs":{}},{"type":"link","attrs":{"href":"http://10.11.32.21:1000","title":"","type":null},"content":[{"type":"text","text":"http://10.11.32.21:1000","attrs":{}}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Last , Let's open the website , Visit the video , We can see some interface effects :","attrs":{}}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/cb/cb5079f519f5e6660c297f01fd9b7a00.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}
Please bring the original link to reprint ,thank
Similar articles

2021-06-15

2021-06-21