您的位置:首页 > 脚本大全 > > 正文

python3.8爬虫需要的包(python爬取基于m3u8协议的ts文件并合并)

更多 时间:2021-10-15 00:48:32 类别:脚本大全 浏览量:106

python3.8爬虫需要的包

python爬取基于m3u8协议的ts文件并合并

前言

简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探html5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.

源码

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • # coding=utf-8
  • import asyncio
  • import multiprocessing
  • import os
  • import re
  • import time
  • from math import floor
  • from multiprocessing import manager
  • import aiohttp
  • import requests
  • from lxml import html
  • import threading
  • from src.my_lib import retry
  • from src.my_lib import time_statistics
  •  
  •  
  • class m3u8download:
  •  _path = "./resource\\" # 本地文件路径
  •  _url_seed = none # 资源所在链接前缀
  •  _target_url = {} # 资源任务目标字典
  •  _mode = ""
  •  _headers = {"user-agent": "mozilla/5.0"} # 浏览器代理
  •  _target_num = 100
  •  
  •  def __init__(self):
  •  self._ml = manager().list() # 进程通信列表
  •  if not os.path.exists(self._path): # 检测本地目录存在否
  •   os.makedirs(self._path)
  •  exec_str = r'chcp 65001'
  •  os.system(exec_str) # 先切换utf-8输出,防止控制台乱码
  •  
  •  def sniffing(self, url):
  •  self._url = url
  •  print("开始嗅探...")
  •  try:
  •   r = requests.get(self._url) # 访问嗅探网址,获取网页信息
  •  except:
  •   print("嗅探失败,网址不正确")
  •   os.system("pause")
  •  else:
  •   tree = html.fromstring(r.content)
  •   try:
  •   source_url = tree.xpath('//video//source/@src')[0] # 嗅探资源控制文件链接,这里只针对一个资源控制文件
  •   # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名
  •   except:
  •   print("嗅探失败,未发现资源")
  •   os.system("pause")
  •   else:
  •   self.analysis(source_url)
  •  
  •  def analysis(self, source_url):
  •  try:
  •   self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名
  •   with requests.get(source_url) as r: # 访问资源控制文件,获得资源信息
  •   src = re.split("\n*#.+\n", r.text) # 解析资源信息
  •   for sub_src in src: # 将资源地址储存到任务字典
  •    if sub_src:
  •    self._target_url[sub_src] = self._url_seed + "/" + sub_src
  •  except exception as e:
  •   print("资源无法成功解析", e)
  •   os.system("pause")
  •  else:
  •   self._target_num = len(self._target_url)
  •   print("sniffing success!!!,found", self._target_num, "url.")
  •   self._mode = input(
  •   "1:-> 单进程(low b)\n2:-> 多进程+多线程(网速开始biubiu飞起!)\n3:-> 多进程+协程(最先进的并发!!!)\n")
  •   if self._mode == "1":
  •   for path, url in self._target_url.items():
  •    self._download(path, url)
  •   elif self._mode == "2" or self._mode == "3":
  •   self._multiprocessing()
  •  
  •  def _multiprocessing(self, processing_num=4): # 多进程,多线程
  •  target_list = {} # 进程任务字典,储存每个进程分配的任务
  •  pool = multiprocessing.pool(processes=processing_num) # 开启进程池
  •  i = 0 # 任务分配标识
  •  for path, url in self._target_url.items(): # 分配进程任务
  •   target_list[path] = url
  •   i += 1
  •   if i % 10 == 0 or i == len(self._target_url): # 每个进程分配十个任务
  •   if self._mode == "2":
  •    pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多线程驱动方法
  •   else:
  •    pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用协程驱动方法
  •   target_list = {}
  •  pool.close() # join函数等待所有子进程结束
  •  pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool
  •  while true:
  •   if self._judge_over():
  •   self._combine()
  •   break
  •  
  •  def _sub_multithreading(self, **kwargs):
  •  for path, url in kwargs.items(): # 根据进程任务开启线程
  •   t = threading.thread(target=self._download, args=(path, url,))
  •   t.start()
  •  
  •  @retry()
  •  def _download(self, path, url): # 同步下载方法
  •  with requests.get(url, headers=self._headers) as r:
  •   if r.status_code == 200:
  •   with open(self._path + path, "wb")as file:
  •    file.write(r.content)
  •   self._ml.append(0) # 每成功一个就往进程通信列表增加一个值
  •   percent = '%.2f' % (len(self._ml) / self._target_num * 100)
  •   print(len(self._ml), ": ", path, "->ok", "\tcomplete:", percent, "%") # 显示下载进度
  •   else:
  •   print(path, r.status_code, r.reason)
  •  
  •  def _sub_coroutine(self, **kwargs):
  •  tasks = []
  •  for path, url in kwargs.items(): # 根据进程任务创建协程任务列表
  •   tasks.append(asyncio.ensure_future(self._async_download(path, url)))
  •  loop = asyncio.get_event_loop() # 创建异步事件循环
  •  loop.run_until_complete(asyncio.wait(tasks)) # 注册任务列表
  •  
  •  async def _async_download(self, path, url): # 异步下载方法
  •  async with aiohttp.clientsession() as session:
  •   async with session.get(url, headers=self._headers) as resp:
  •   try:
  •    assert resp.status == 200, "e" # 断言状态码为200,否则抛异常,触发重试装饰器
  •    with open(self._path + path, "wb")as file:
  •    file.write(await resp.read())
  •   except exception as e:
  •    print(e)
  •   else:
  •    self._ml.append(0) # 每成功一个就往进程通信列表增加一个值
  •    percent = '%.2f' % (len(self._ml) / self._target_num * 100)
  •    print(len(self._ml), ": ", path, "->ok", "\tcomplete:", percent, "%") # 显示下载进度
  •  
  •  def _combine(self): # 组合资源方法
  •  try:
  •   print("开始组合资源...")
  •   identification = str(floor(time.time()))
  •   exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"'
  •   os.system(exec_str) # 使用cmd命令将资源整合
  •   exec_str = r'del "' + self._path + r'*.ts"'
  •   os.system(exec_str) # 删除原来的文件
  •  except:
  •   print("资源组合失败")
  •  else:
  •   print("资源组合成功!")
  •  
  •  def _judge_over(self): # 判断是否全部下载完成
  •  if len(self._ml) == len(self._target_url):
  •   return true
  •  return false
  •  
  •  
  • @time_statistics
  • def app():
  •  multiprocessing.freeze_support()
  •  url = input("输入嗅探网址:\n")
  •  m3u8 = m3u8download()
  •  m3u8.sniffing(url)
  •  # m3u8.analysis(url)
  •  
  •  
  • if __name__ == "__main__":
  •  app()
  • 这里是两个装饰器的实现:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • import time
  •  
  •  
  • def time_statistics(fun):
  •  def function_timer(*args, **kwargs):
  •  t0 = time.time()
  •  result = fun(*args, **kwargs)
  •  t1 = time.time()
  •  print("total time running %s: %s seconds" % (fun.__name__, str(t1 - t0)))
  •  return result
  •  
  •  return function_timer
  •  
  •  
  • def retry(retries=3):
  •  def _retry(fun):
  •  def wrapper(*args, **kwargs):
  •   for _ in range(retries):
  •   try:
  •    return fun(*args, **kwargs)
  •   except exception as e:
  •    print("@", fun.__name__, "->", e)
  •  
  •  return wrapper
  •  
  •  return _retry
  • 打包成exe文件

    使用pyinstaller -f download.py将程序打包成单个可执行文件.
    这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.

    关于协程

    协程在python3.5进化到了async await版本,用 async 标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.

    关于装饰器

    装饰器源于闭包原理,这里使用了两种装饰器.

    • @time_statistics:统计耗时,装饰器自己无参型
    • @retry():设置重试次数,装饰器自己有参型
    • 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于():
    • 不带括号时,调用的是这个函数本身
    • 带括号(此时必须传入需要的参数),调用的是函数的return结果

    关于cmd控制台

    程序会使用cmd命令来将下载的ts文件合并.
    因为cmd默认使用gb2312编码,调用os.system()需要先切换成通用的utf-8输出,否则系统信息会乱码.
    而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持开心学习网。

    原文链接:https://blog.csdn.net/qq_37258787/article/details/80298084

    标签:Python 爬取 m3u8
    您可能感兴趣