/ 作者简介 /
本篇文章来自 MRArchive 的投稿,分享了如何使用Python下载M3U8格式的视频,希望对大家有所帮助!同时也感谢作者贡献的精彩文章。
作者信息
网站:https://toodo.fun
B站:https://space.bilibili.com/314191627
/ 背景简介 /
M3U 是一种播放多媒体列表的文件格式,它的设计初衷是为了播放音频文件,比如MP3,但是越来越多的软件现在用来播放视频文件列表,M3U也可以指定在线流媒体音频源。很多播放器和软件都支持M3U文件格式。
m3u8 文件是 HTTP Live Streaming(缩写为 HLS) 协议的部分内容,而 HLS 是一个由苹果公司提出的基于 HTTP 的流媒体网络传输协议。
HLS 是新一代流媒体传输协议,其基本实现原理为将一个大的媒体文件进行分片,将该分片文件资源路径记录于 m3u8 文件(即 playlist)内,其中附带一些额外描述(比如该资源的多带宽信息···)用于提供给客户端。客户端依据该 m3u8 文件即可获取对应的媒体资源,进行播放。
扩展M3U指令:
#EXTM3U //必需,表示一个扩展的m3u文件
#EXT-X-VERSION:3 //hls的协议版本号,暗示媒体流的兼容性
#EXT-X-MEDIA-SEQUENCE:xx //首个分段的sequence number
#EXT-X-ALLOW-CACHE:NO //是否缓存
#EXT-X-TARGETDURATION:5 //每个视频分段最大的时长(单位秒)
#EXT-X-DISCONTINUITY //表示换编码
#EXTINF: //每个切片的时长
/ 获取.m3u8文件中的视频信息 /
.m3u8文件储存了视频所在的位置信息,我们可以通过发送一个Get请求来获取链接中的内容:
content = requests.get(m3u8Url).text
/ 拼接视频下载链接 /
我们通过得到的视频信息进行视频网址拼接,.m3u8文件中的链接可以为全路径或者相对路径,所以我们判断后进行拼接并存入一个列表当中:
urls = []for index, video in enumerate(content.split('\n')): if '#EXTINF' in video: if content[index + 1][0] == '/': downloadLink = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + content[index + 1] elif content[index + 1][:4] == 'http': downloadLink = content[index + 1] else: downloadLink = url.replace(url.split('/')[-1], content[index + 1]) urls.append(downloadLink)
/ 使用多线程下载视频 /
得到视频列表后,我们就可以对视频进行下载了,为了提高下载效率,我们可以使用多线程进行下载:
def download(downloadLink, name): for _ in range(10): try: req = requests.get(downloadLink, headers=headers, timeout=15).content with open(f"{name}", "wb") as f: f.write(req) f.flush() break except: if _ == 9: print(f"{name}下载失败") else: print(f"{name}正在进行第{_}次重试")pool = ThreadPoolExecutor(max_workers=threadNum)futures = []for index, downloadLink in enumerate(urls): fileList.append(os.path.basename(downloadLink)) futures.append(pool.submit(download, downloadLink, f"{downloadPath}/{os.path.basename(downloadLink)}"))wait(futures)
/ 合并视频 /
等待全部下载完成后,是一个个的ts视频文件,然后我们再将这些文件合并成一个视频文件,此时要注意视频的顺序,我们可以在我们的视频列表中依次取出进行合并
def merge_file(path, name): global fileList cmd = "copy /b " for i in fileList: if i != fileList[-1]: cmd += f"{i} + " else: cmd += f"{i} {name}" os.chdir(path) with open('combine.cmd', 'w') as f: f.write(cmd) os.system("combine.cmd") os.system('del /Q *.ts') os.system('del /Q *.cmd')
这里我们是写了一个脚本来完成合并的任务,使用命令'copy /b file1 + file2 +... + fileN newFile'进行合并,并于完成后将ts小文件进行了删除。至此,视频就下载完成了。
/ 完整代码如下 /
import requestsimport osfrom concurrent.futures import ThreadPoolExecutor, waitimport sysfinishedNum = 0allNum = 0fileList = []headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0'}def download(downloadLink, name): global finishedNum global allNum for _ in range(10): try: req = requests.get(downloadLink, headers=headers, timeout=15).content with open(f"{name}", "wb") as f: f.write(req) f.flush() finishedNum += 1 print(f"{name}下载成功, 总进度{round(finishedNum / allNum * 100, 2)}% ({finishedNum}/{allNum})") break except: if _ == 9: print(f"{name}下载失败") else: print(f"{name}正在进行第{_}次重试")def merge_file(path, name): global fileList cmd = "copy /b " for i in fileList: if i != fileList[-1]: cmd += f"{i} + " else: cmd += f"{i} {name}" os.chdir(path) with open('combine.cmd', 'w') as f: f.write(cmd) os.system("combine.cmd") os.system('del /Q *.ts') os.system('del /Q *.cmd')def downloader(url, name, threadNum): global allNum global fileList print("读取文件信息中...") downloadPath = 'Download' if not os.path.exists(downloadPath): os.mkdir(downloadPath) # 查看是否存在 if os.path.exists(f"{downloadPath}/{name}"): print(f"视频文件已经存在,如需重新下载请先删除之前的视频文件") return content = requests.get(url, headers=headers).text.split('\n') if "#EXTM3U" not in content[0]: raise BaseException(f"非M3U8链接") # .m3u8 跳转 for video in content: if ".m3u8" in video: if video[0] == '/': url = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + video elif video[:4] == 'http': url = video else: url = url.replace(url.split('/')[-1], video) print(url) content = requests.get(url, headers=headers).text.split('\n') urls = [] for index, video in enumerate(content): if '#EXTINF' in video: if content[index + 1][0] == '/': downloadLink = url.split('//')[0] + "//" + url.split('//')[1].split('/')[0] + content[index + 1] elif content[index + 1][:4] == 'http': downloadLink = content[index + 1] else: downloadLink = url.replace(url.split('/')[-1], content[index + 1]) urls.append(downloadLink) allNum = len(urls) pool = ThreadPoolExecutor(max_workers=threadNum) futures = [] for index, downloadLink in enumerate(urls): fileList.append(os.path.basename(downloadLink)) futures.append(pool.submit(download, downloadLink, f"{downloadPath}/{os.path.basename(downloadLink)}")) wait(futures) print(f"运行完成") merge_file(downloadPath, name) print(f"合并完成") print(f"文件下载成功,尽情享用吧")if __name__ == '__main__': videoUrl = str(sys.argv[1]) name = str(sys.argv[2]) threadNum = int(sys.argv[3]) downloader(videoUrl, name, threadNum)
/ 打包好的Windows版软件下载 /
百度云下载地址:https://pan.baidu.com/s/1ZsPb9WTmYP8VUKuR9tuoyw 提取码:mxb6
蓝奏云下载地址:https://lanzous.com/icnc1ve