You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
6.6 KiB

  1. # demotape.py checks regulary the webstreams of all district parlaments
  2. # in Vienna. If a webstream is online, it gets recorded into seperate
  3. # directories per district.
  4. import os
  5. import sys
  6. import time
  7. from datetime import datetime
  8. import random
  9. import m3u8
  10. import youtube_dl
  11. import asyncio
  12. import concurrent.futures
  13. import ntpath
  14. import yaml
  15. from pathlib import Path
  16. import logging
  17. logging.basicConfig(filename='/var/log/demotape.log', level=logging.INFO)
  18. logging.info("Starting demotape service at " + str(datetime.utcnow()))
  19. config_path = Path(__file__).parent / './config.yaml'
  20. with config_path.open() as file:
  21. config = yaml.load(file, Loader=yaml.FullLoader)
  22. try:
  23. if sys.argv[1] and os.path.exists(sys.argv[1]):
  24. ROOT_PATH = sys.argv[1]
  25. logging.info('Root path for downloaded streams: ' + ROOT_PATH)
  26. else:
  27. logging.info('destination path does not exist')
  28. sys.exit()
  29. except IndexError:
  30. logging.info('Script needs a valid destination path for recorded videos as argument')
  31. logging.info('For example: \ndemotape.py /path/to/videos')
  32. sys.exit()
  33. def timestamp():
  34. dateTimeObj = datetime.now()
  35. return '[ ' + dateTimeObj.strftime("%F %H:%M:%S.%f") + ' ] '
  36. def generate_channellist():
  37. channels = []
  38. districts = range(1, 23 + 1) # districts of vienna
  39. for district_num in districts:
  40. # district_str = str(district_num)
  41. district_str_lz = str(district_num).zfill(2) # leading zero
  42. channel = {
  43. 'name': '1' + district_str_lz + '0', # 1010 - 1230
  44. 'url': 'https://stream.wien.gv.at/live/ngrp:bv' + district_str_lz + '.stream_all/playlist.m3u8'
  45. }
  46. channels.append(channel)
  47. logging.info('channels:')
  48. for channel in channels:
  49. logging.info(channel['name'] + ' ' + channel['url'])
  50. return channels
  51. def check_stream(url):
  52. playlist = m3u8.load(url)
  53. try:
  54. if playlist.data['playlists']:
  55. # has active live stream
  56. return True
  57. else:
  58. # no livestream
  59. return False
  60. except (ValueError, KeyError):
  61. logging.info('some connection error or so')
  62. class MyLogger(object):
  63. def debug(self, msg):
  64. #pass
  65. logging.info(msg)
  66. def warning(self, msg):
  67. #pass
  68. logging.info(msg)
  69. def error(self, msg):
  70. logging.info(msg)
  71. def my_ytdl_hook(d):
  72. if d['status'] == 'finished':
  73. logging.info(timestamp() + 'Done downloading!')
  74. else:
  75. logging.info(timestamp() + 'sth went wrong' + d['status'])
  76. logging.info(d)
  77. def download_stream(channel, dest_path):
  78. logging.info(timestamp() + ' download_stream')
  79. ytdl_opts = {
  80. 'logger': MyLogger(),
  81. 'outtmpl': dest_path,
  82. 'format': 'bestaudio/best',
  83. # 'recodevideo': 'mp4',
  84. # 'postprocessors': [{
  85. # 'key': 'FFmpegVideoConvertor',
  86. # 'preferedformat': 'mp4',
  87. # 'preferredquality': '25',
  88. # }],
  89. # should just stop after a few retries and start again instead of hanging in the loop of trying to download
  90. 'retries': 3,
  91. 'fragment-retries': 3,
  92. 'progress_hooks': [my_ytdl_hook]
  93. }
  94. ytdl = youtube_dl.YoutubeDL(ytdl_opts)
  95. try:
  96. logging.info(timestamp() + " Downloading: " + channel['url'])
  97. ytdl.download([channel['url']])
  98. except (youtube_dl.utils.DownloadError) as e:
  99. logging.info(timestamp() + " Download error: " + str(e))
  100. except (youtube_dl.utils.SameFileError) as e:
  101. logging.info("Download error: " + str(e))
  102. except (UnicodeDecodeError) as e:
  103. logging.info("UnicodeDecodeError: " + str(e))
  104. def process_channel(channel):
  105. #logging.info('entered function process_channel with ' + channel['name'])
  106. while True:
  107. logging.info(timestamp() + ' checking ' + channel['name'])
  108. if check_stream(channel['url']):
  109. logging.info(channel['name'] + ': stream online! Downloading ...')
  110. dest_dir = ROOT_PATH + '/' + channel['name'] +'/'
  111. # create directory if it doesn't exist
  112. if not os.path.exists(dest_dir):
  113. logging.info('creating directory ' + dest_dir)
  114. os.makedirs(dest_dir)
  115. dest_path = get_destpath(channel) # dirctory + filename
  116. download_stream(channel, dest_path) # also converts video
  117. logging.info(timestamp() + " Uploading video " + dest_path)
  118. upload_video(dest_path)
  119. else:
  120. waitingtime = random.randint(50,60)
  121. time.sleep(waitingtime)
  122. logging.info('end processing ' + channel['name'] + ' ... (shouldn\'t happen!)')
  123. def upload_video(videofile_path):
  124. logging.info('uploading %s' % (videofile_path))
  125. credentials = config['webdav']['username'] + ':' + config['webdav']['password']
  126. webdav_baseurl = config['webdav']['base_url']
  127. filename = ntpath.basename(videofile_path)
  128. webdav_url = webdav_baseurl + filename
  129. try:
  130. # Upload to cloud using webdav
  131. result = os.system('curl -L -u %s -T "%s" "%s"' % (credentials, videofile_path, webdav_url))
  132. if result == 0: # exit code
  133. delete_video(videofile_path)
  134. return True
  135. except:
  136. logging.info('Error while uploading %s to %s' % (file, webdav_url))
  137. def delete_video(file):
  138. try:
  139. logging.info('Deleting Video %s' % (file))
  140. os.system('rm -rf "%s"' % (file))
  141. return True
  142. except:
  143. logging.info('Error while deleting %s' % (file))
  144. def get_destpath(channel):
  145. now = datetime.now() # current date and time
  146. dest_dir = ROOT_PATH + '/' + channel['name'] +'/'
  147. dest_filename = channel['name'] + "_" + now.strftime("%Y-%m-%d--%H.%M.%S") + '.mp4'
  148. return dest_dir + dest_filename
  149. def main():
  150. channels = generate_channellist()
  151. with concurrent.futures.ThreadPoolExecutor(max_workers=23) as executor:
  152. future_to_channel = {executor.submit(process_channel, channel): channel for channel in channels}
  153. for future in concurrent.futures.as_completed(future_to_channel):
  154. channel = future_to_channel[future]
  155. try:
  156. data = future.result()
  157. except Exception as exc:
  158. logging.info('%r generated an exception: %s' % (channel, exc))
  159. else:
  160. logging.info('%r page is %d bytes' % (channel, len(data)))
  161. logging.info('end main (this shouldn\'t happen!)')
  162. main()
  163. #test_channel = {
  164. # 'name': 'Test Channel',
  165. # 'url': 'https://1000338copo-app2749759488.r53.cdn.tv1.eu/1000518lf/1000338copo/live/app2749759488/w2928771075/live247.smil/playlist.m3u8'
  166. # }
  167. #download_stream(test_channel)