|
- # demotape.py checks regulary the webstreams of all district parlaments
- # in Vienna. If a webstream is online, it gets recorded into seperate
- # directories per district.
-
- import os
- import sys
- import time
- from datetime import datetime
- import random
- import m3u8
- import youtube_dl
- import asyncio
- import concurrent.futures
- import ntpath
- import yaml
- from pathlib import Path
- import logging
-
- logging.basicConfig(filename='/var/log/demotape.log', level=logging.INFO)
- logging.info("Starting demotape service at " + str(datetime.utcnow()))
-
- config_path = Path(__file__).parent / './config.yaml'
- with config_path.open() as file:
- config = yaml.load(file, Loader=yaml.FullLoader)
-
- try:
- if sys.argv[1] and os.path.exists(sys.argv[1]):
- ROOT_PATH = sys.argv[1]
- logging.info('Root path for downloaded streams: ' + ROOT_PATH)
- else:
- logging.info('destination path does not exist')
- sys.exit()
- except IndexError:
- logging.info('Script needs a valid destination path for recorded videos as argument')
- logging.info('For example: \ndemotape.py /path/to/videos')
- sys.exit()
-
-
- def timestamp():
- dateTimeObj = datetime.now()
- return '[ ' + dateTimeObj.strftime("%F %H:%M:%S.%f") + ' ] '
-
-
-
- def generate_channellist():
- channels = []
- districts = range(1, 23 + 1) # districts of vienna
-
- for district_num in districts:
- # district_str = str(district_num)
- district_str_lz = str(district_num).zfill(2) # leading zero
- channel = {
- 'name': '1' + district_str_lz + '0', # 1010 - 1230
- 'url': 'https://stream.wien.gv.at/live/ngrp:bv' + district_str_lz + '.stream_all/playlist.m3u8'
- }
- channels.append(channel)
- logging.info('channels:')
- for channel in channels:
- logging.info(channel['name'] + ' ' + channel['url'])
- return channels
-
-
- def check_stream(url):
- playlist = m3u8.load(url)
- try:
- if playlist.data['playlists']:
- # has active live stream
- return True
- else:
- # no livestream
- return False
- except (ValueError, KeyError):
- logging.info('some connection error or so')
-
-
-
- class MyLogger(object):
- def debug(self, msg):
- #pass
- logging.info(msg)
-
- def warning(self, msg):
- #pass
- logging.info(msg)
-
- def error(self, msg):
- logging.info(msg)
-
-
- def my_ytdl_hook(d):
- if d['status'] == 'finished':
- logging.info(timestamp() + 'Done downloading!')
- else:
- logging.info(timestamp() + 'sth went wrong' + d['status'])
- logging.info(d)
-
-
- def download_stream(channel, dest_path):
- logging.info(timestamp() + ' download_stream')
- ytdl_opts = {
- 'logger': MyLogger(),
- 'outtmpl': dest_path,
- 'format': 'bestaudio/best',
- # 'recodevideo': 'mp4',
- # 'postprocessors': [{
- # 'key': 'FFmpegVideoConvertor',
- # 'preferedformat': 'mp4',
- # 'preferredquality': '25',
- # }],
- # should just stop after a few retries and start again instead of hanging in the loop of trying to download
- 'retries': 3,
- 'fragment-retries': 3,
- 'progress_hooks': [my_ytdl_hook]
- }
- ytdl = youtube_dl.YoutubeDL(ytdl_opts)
-
- try:
- logging.info(timestamp() + " Downloading: " + channel['url'])
- ytdl.download([channel['url']])
- except (youtube_dl.utils.DownloadError) as e:
- logging.info(timestamp() + " Download error: " + str(e))
- except (youtube_dl.utils.SameFileError) as e:
- logging.info("Download error: " + str(e))
- except (UnicodeDecodeError) as e:
- logging.info("UnicodeDecodeError: " + str(e))
-
-
- def process_channel(channel):
- #logging.info('entered function process_channel with ' + channel['name'])
- while True:
-
- logging.info(timestamp() + ' checking ' + channel['name'])
- if check_stream(channel['url']):
- logging.info(channel['name'] + ': stream online! Downloading ...')
- dest_dir = ROOT_PATH + '/' + channel['name'] +'/'
- # create directory if it doesn't exist
- if not os.path.exists(dest_dir):
- logging.info('creating directory ' + dest_dir)
- os.makedirs(dest_dir)
- dest_path = get_destpath(channel) # dirctory + filename
- download_stream(channel, dest_path) # also converts video
- logging.info(timestamp() + " Uploading video " + dest_path)
- upload_video(dest_path)
- else:
- waitingtime = random.randint(50,60)
- time.sleep(waitingtime)
-
-
- logging.info('end processing ' + channel['name'] + ' ... (shouldn\'t happen!)')
-
-
- def upload_video(videofile_path):
- logging.info('uploading %s' % (videofile_path))
- credentials = config['webdav']['username'] + ':' + config['webdav']['password']
- webdav_baseurl = config['webdav']['base_url']
- filename = ntpath.basename(videofile_path)
- webdav_url = webdav_baseurl + filename
- try:
- # Upload to cloud using webdav
- result = os.system('curl -L -u %s -T "%s" "%s"' % (credentials, videofile_path, webdav_url))
- if result == 0: # exit code
- delete_video(videofile_path)
- return true
- except:
- logging.info('Error while uploading %s to %s' % (file, webdav_url))
-
-
- def delete_video(file):
- try:
- os.system('rm -rf "%s"' % (file))
- return true
- except:
- logging.info('Error while deleting %s' % (file))
-
-
- def get_destpath(channel):
- now = datetime.now() # current date and time
- dest_dir = ROOT_PATH + '/' + channel['name'] +'/'
- dest_filename = channel['name'] + "_" + now.strftime("%Y-%m-%d--%H.%M.%S") + '.mp4'
- return dest_dir + dest_filename
-
-
- def main():
- channels = generate_channellist()
-
- with concurrent.futures.ThreadPoolExecutor(max_workers=23) as executor:
- future_to_channel = {executor.submit(process_channel, channel): channel for channel in channels}
-
- for future in concurrent.futures.as_completed(future_to_channel):
- channel = future_to_channel[future]
- try:
- data = future.result()
- except Exception as exc:
- logging.info('%r generated an exception: %s' % (channel, exc))
- else:
- logging.info('%r page is %d bytes' % (channel, len(data)))
-
-
- logging.info('end main (this shouldn\'t happen!)')
-
-
- main()
-
-
-
-
-
-
- #test_channel = {
- # 'name': 'Test Channel',
- # 'url': 'https://1000338copo-app2749759488.r53.cdn.tv1.eu/1000518lf/1000338copo/live/app2749759488/w2928771075/live247.smil/playlist.m3u8'
- # }
- #download_stream(test_channel)
|