From 8208c9e467a8a051d09f4d0ff95cf5bc68c7a8f3 Mon Sep 17 00:00:00 2001 From: Yuanle Song Date: Sun, 3 Mar 2019 16:03:52 +0800 Subject: [PATCH] v0.2.0 add process pool support moved default temp dir to ~/.cache/ --- Makefile | 7 ++- m3u8downloader/__init__.py | 2 +- m3u8downloader/config.py | 3 +- m3u8downloader/main.py | 100 +++++++++++++++++++++++++------- m3u8downloader/sanity_check.py | 2 +- m3u8downloader/test_main.py | 20 +++++++ operational | 101 +++++++++++++++++++++++++++++---- setup.py | 6 +- 8 files changed, 201 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index b78136a..33deb37 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PYTHON_MODULES := m3u8downloader PYTHONPATH := . VENV := .venv PYTEST := env PYTHONPATH=$(PYTHONPATH) PYTEST=1 $(VENV)/bin/py.test -PYLINT := env PYTHONPATH=$(PYTHONPATH) $(VENV)/bin/pylint --disable=I0011 --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" +PYLINT := env PYTHONPATH=$(PYTHONPATH) $(VENV)/bin/pylint --disable=I0011,line-too-long,invalid-name --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" PEP8 := env PYTHONPATH=$(PYTHONPATH) $(VENV)/bin/pycodestyle --repeat --ignore=E202,E501,E402,W504 PYTHON := env PYTHONPATH=$(PYTHONPATH) $(VENV)/bin/python PIP := $(VENV)/bin/pip @@ -27,8 +27,9 @@ version: debug: env DEBUG=1 $(PYTHON) m3u8downloader/main.py run: -#$(PYTHON) m3u8downloader/main.py "高颜值小美女KTV卫生间没操得手 醉酒带到宾馆被两男无套抽插轮着操.mp4" "http://www.meituii.space/20190227/IaArsErV/index.m3u8" - $(PYTHON) m3u8downloader/main.py ~/d/t2/"网红主播在外面找男人之在停车场做爱直播系列。这种妓女主播.mp4" "http://www.meituii.space/20190227/D9cU9xCM/index.m3u8" + $(PYTHON) m3u8downloader/main.py $(OFILE) $(URL) +t1: + $(PYTHON) m3u8downloader/t1.py uwsgi: $(VENV)/bin/uwsgi --processes=2 --threads=4 --wsgi-file=m3u8downloader/main.py --env=PYTHONPATH=. --http=localhost:8082 --disable-logging shell: diff --git a/m3u8downloader/__init__.py b/m3u8downloader/__init__.py index 3dc1f76..d3ec452 100644 --- a/m3u8downloader/__init__.py +++ b/m3u8downloader/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/m3u8downloader/config.py b/m3u8downloader/config.py index 2587eec..145da09 100644 --- a/m3u8downloader/config.py +++ b/m3u8downloader/config.py @@ -8,12 +8,13 @@ config variables from __future__ import (absolute_import, division, print_function, unicode_literals, with_statement) -import m3u8downloader.configlogger import logging import os from wells.config import ConfigurationManger +import m3u8downloader.configlogger # pylint: disable=unused-import + logger = logging.getLogger(__name__) diff --git a/m3u8downloader/main.py b/m3u8downloader/main.py index c4a5e35..0c5b9f8 100644 --- a/m3u8downloader/main.py +++ b/m3u8downloader/main.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # coding=utf-8 """download m3u8 file reliably. @@ -19,12 +19,14 @@ import subprocess import re from urllib.parse import urljoin, urlparse from collections import OrderedDict +import multiprocessing +import multiprocessing.queues import logging import requests from wells.utils import retry -import m3u8downloader.configlogger +import m3u8downloader.configlogger # pylint: disable=unused-import logger = logging.getLogger(__name__) SESSION = requests.Session() @@ -79,20 +81,36 @@ def get_suffix_from_url(url): return "." + r[-1] +def get_basename(filename): + """return filename with path and ext removed. + + """ + return os.path.splitext(os.path.basename(filename))[0] + + +def get_fullpath(filename): + """make a canonical absolute path filename. + + """ + return os.path.abspath(os.path.expandvars(os.path.expanduser(filename))) + + class M3u8Downloader: def __init__(self, url, output_filename, tempdir="."): self.start_url = url logger.info("output_filename=%s", output_filename) - self.output_filename = output_filename - - _, output_filename_nodir = os.path.split(output_filename) - self.tempdir = os.path.abspath( - os.path.join(tempdir, "tmp-" + output_filename_nodir)) - os.makedirs(self.tempdir, exist_ok=True) - logger.info("using temp dir at: %s", self.tempdir) + self.output_filename = get_fullpath(output_filename) + self.tempdir = get_fullpath( + os.path.join(tempdir, get_basename(output_filename))) + try: + os.makedirs(self.tempdir, exist_ok=True) + logger.info("using temp dir at: %s", self.tempdir) + except IOError as _: + logger.exception("create tempdir failed for: %s", self.tempdir) + raise self.media_playlist_localfile = None - self.sequence_number = 0 + self.poolsize = 5 # {full_url: local_file} self.fragments = OrderedDict() @@ -114,11 +132,11 @@ class M3u8Downloader: logger.error("run ffmpeg command failed: exitcode=%s", proc.returncode) sys.exit(proc.returncode) - logger.info("mp4 file created: %s size: %.1fMiB", - target_mp4, filesizeMiB(target_mp4)) - logger.info("To clean up temp files:\nrm -rf \"%s\"", self.tempdir) - # logger.info("clean up temp files") - # subprocess.run(["/bin/rm", "-rf", self.tempdir]) + logger.info("mp4 file created, size=%.1fMiB, filename=%s", + filesizeMiB(target_mp4), target_mp4) + logger.info("Running: rm -rf \"%s\"", self.tempdir) + subprocess.run(["/bin/rm", "-rf", self.tempdir]) + logger.info("temp files removed") def mirror_url_resource(self, remote_file_url): """download remote file and replicate the same dir structure locally. @@ -161,14 +179,44 @@ class M3u8Downloader: """download a video fragment. """ - if url in self.fragments: - logger.info("skip downloaded fragment: %s", url) - return fragment_full_name = self.mirror_url_resource(url) if fragment_full_name: logger.info("fragment created at: %s", fragment_full_name) + return (url, fragment_full_name) + + def fragment_downloaded(self, result): + """apply_async callback. + + """ + url, fragment_full_name = result self.fragments[url] = fragment_full_name + def fragment_download_failed(self, e): # pylint: disable=no-self-use + """apply_async error callback. + + """ + try: + raise e + except Exception: # pylint: disable=broad-except + # I don't have the url in the run time exception. hope requests + # exception have it. + logger.exception("fragment download failed") + + def download_fragments(self, fragment_urls): + """download fragments. + + """ + pool = multiprocessing.Pool(self.poolsize) + for url in fragment_urls: + if url in self.fragments: + logger.info("skip downloaded fragment: %s", url) + continue + pool.apply_async(self.download_fragment, (url,), + callback=self.fragment_downloaded, + error_callback=self.fragment_download_failed) + pool.close() + pool.join() + def process_media_playlist(self, url, content=None): """replicate every file on the playlist in local temp dir. @@ -176,14 +224,20 @@ class M3u8Downloader: self.media_playlist_localfile = self.mirror_url_resource(url) if content is None: content = get_url_content(url) + + fragment_urls = [] for line in content.decode("utf-8").split('\n'): if line.startswith('#EXT-X-KEY'): self.download_key(url, line) - if line.startswith('#'): + continue + if line.startswith('#') or line.strip() == '': continue if line.endswith(".m3u8"): raise RuntimeError("media playlist should not include .m3u8") - self.download_fragment(urljoin(url, line)) + fragment_urls.append(urljoin(url, line)) + + self.download_fragments(fragment_urls) + logger.info("media playlist all fragments downloaded") def process_master_playlist(self, url, content): """choose the highest quality media playlist, and download it. @@ -226,11 +280,15 @@ def main(): try: ofile = sys.argv[1] url = sys.argv[2] + if len(sys.argv) > 3: + tempdir = sys.argv[3] + else: + tempdir = get_fullpath('~/.cache/m3u8downloader') except IndexError: logger.error("Usage: m3u8 OUTPUT_FILE URL") sys.exit(1) SESSION.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'}) - downloader = M3u8Downloader(url, ofile) + downloader = M3u8Downloader(url, ofile, tempdir) downloader.start() diff --git a/m3u8downloader/sanity_check.py b/m3u8downloader/sanity_check.py index c378282..e481210 100644 --- a/m3u8downloader/sanity_check.py +++ b/m3u8downloader/sanity_check.py @@ -10,7 +10,7 @@ import logging from wells.utils import check -from m3u8downloader.config import CONF +from m3u8downloader.config import CONF # pylint: disable=unused-import # from m3u8downloader import db logger = logging.getLogger(__name__) diff --git a/m3u8downloader/test_main.py b/m3u8downloader/test_main.py index 14c6b8b..74feebd 100644 --- a/m3u8downloader/test_main.py +++ b/m3u8downloader/test_main.py @@ -10,6 +10,8 @@ import os.path from m3u8downloader.main import get_suffix_from_url from m3u8downloader.main import is_higher_resolution from m3u8downloader.main import get_url_path +from m3u8downloader.main import get_basename +from m3u8downloader.main import get_fullpath # test for join @@ -36,3 +38,21 @@ def test_is_higher_resolution(): assert is_higher_resolution("480x854", None) assert not is_higher_resolution("480x854", "720x1280") assert is_higher_resolution("720x1280", "480x854") + + +# test for get_basename +def test_get_basename(): + assert get_basename("foo.mp4") == "foo" + assert get_basename("~/d/t2/foo.mp4") == "foo" + assert get_basename("d/t2/foo.mp4") == "foo" + assert get_basename("./foo.mp4") == "foo" + assert get_basename("./foo") == "foo" + + +# test for get_fullpath +def test_get_fullpath(): + assert get_fullpath("foo") == os.path.abspath(os.path.join(os.curdir, "foo")) + assert get_fullpath("foo/") == os.path.abspath(os.path.join(os.curdir, "foo")) + assert get_fullpath("foo/bar") == os.path.abspath(os.path.join(os.curdir, "foo", "bar")) + assert get_fullpath("~/foo/") == os.path.expanduser("~/foo") + assert get_fullpath("$HOME/foo/") == os.path.expanduser("~/foo") diff --git a/operational b/operational index 9b3d420..8bbb880 100644 --- a/operational +++ b/operational @@ -4,24 +4,45 @@ Time-stamp: <2019-03-03> #+STARTUP: content * notes :entry: * later :entry: -* current :entry: -** -** 2019-03-03 some list is very long. 695 fragments. I need a thread pool. -5 concurrent worker. +** 2019-03-03 create a chrome extension, click button to send page url to ~/bin/mm +to handle it on ryzen5. -self.process_media_playlist() +- chrome extension. + do fetch call to + GET https://mm.emacsos.com/api/mm?url=xxx -self.download_fragment(urljoin(url, line)) + this api should check whether url is supported. if so, store url in + db/persistent queue and return immediately, and do parsing and download in + other thread. if not, return some err msg. -this is the key part I need concurrency. + This api should be able to handle both pictures and m3u8 videos. -search: python you can switch between threadpool processpool etc +- chrome extension can view downloads. + get data from + GET https://mm.emacsos.com/api/history + + this will have data for: + | url | page title | task add time | dl progress | dl time | target file | + |------------+------------+---------------------+---------------+---------+-------------| + | foo.com/a/ | xxx | 2019-03-03 11:42:59 | [xxxxxx.....] | 25s | - | + | | | | | | | + +- maybe I can host mm.emacsos.com on the public internet. + add some static token based auth support. + + This allow me send url to download from mobile devices. + but chrome mobile and safari doesn't support extensions. + opening a website to store a url is not better than just store it in notes app. -concurrent.future? + won't do this. just send request to localhost. + make the api endpoint configurable. +* current :entry: +** ** 2019-03-03 add progress tracking log. do a commit before I add this. +* done :entry: ** 2019-03-02 my plan - target temp dir: subdir and filename same as original path. @@ -31,7 +52,52 @@ do a commit before I add this. - run ffmpeg command: ffmpeg -allowed_extensions ALL -i local.m3u8 -c copy -bsf:a aac_adtstoasc all.mp4 -* done :entry: +** 2019-03-03 some list is very long. 695 fragments. I need a thread pool. +5 concurrent worker. + +self.process_media_playlist() + +self.download_fragment(urljoin(url, line)) + +this is the key part I need concurrency. + +search: python you can switch between threadpool processpool etc + +multiprocessing.Pool works. + +- if I use multiprocessing.Pool.map, I no longer have progress info. + could use external db for progress tracking. + + if I use multiprocessing worker, self.fragments[url] = fragment_full_name + will no longer work. the slave thread will get a copy of every variable. + + seems not a good idea. + + use pool.apply_async instead. + this can have callback which can modify self.fragments[url]. + +- dev + - the check "if url in self.fragments" won't work. + it's a copy, not realtime updated distributed dict. + fixed. do the check in main thread. + - ffmpeg output is not reproducible anymore? + ['ffmpeg', '-allowed_extensions', 'ALL', '-i', '/home/sylecn/projects/m3u8downloader/tmp-foo2/20190227/D9cU9xCM/480kb/hls/index.m3u8', '-acodec', 'copy', '-vcodec', 'copy', '-bsf:a', 'aac_adtstoasc', '/home/sylecn/d/t2/foo2.mp4'] + + ffmpeg -allowed_extensions ALL -i /home/sylecn/projects/m3u8downloader/tmp-foo2/20190227/D9cU9xCM/480kb/hls/index.m3u8 -acodec copy -vcodec copy -bsf:a aac_adtstoasc /home/sylecn/d/t2/foo3.mp4 + ll -sh /home/sylecn/d/t2/foo3.mp4 + + diff /home/sylecn/d/t2/foo3.mp4 /home/sylecn/d/t2/foo2.mp4 + it works okay. try it again using redownloaded files. + + cp ~/d/t2/foo2.mp4 ~/d/t2/foo2-bak.mp4 + make run + diff /home/sylecn/d/t2/foo2.mp4 ~/d/t2/foo2-bak.mp4 + They do differ. maybe it's just metadata? + + foo2-bak has pre-episode ad seconds. + foo2 doesn't have. + - ffmpeg created video has glitches. + ** 2019-03-03 try enable http keepalive and connection reuse. I see lot of starting new http connection to the same site. @@ -57,3 +123,18 @@ The methods defined are: NONE, AES-128, and SAMPLE-AES. since ffmpeg already handle this. I don't need to know the details. * wontfix :entry: +** 2019-03-03 ffmpeg created video from local m3u8 file has glitches. +especially for the first few seconds. + +- try play the local m3u8 file in chrome. + + mmv http://sex8.cc/thread-11588980-1-1.html + create a html file myself. + html