Source code for ytdl2rss

#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
"""
Create podcast RSS from youtube-dl info JSON.

The info JSON produced by youtube-dl is not formally specified, although
attempts have been made to do so, such as
https://github.com/ytdl-org/youtube-dl/pull/21822
Functions in this module expect inputs to follow this proposed schema, although
reasonable attempts will be made to accommodate files encountered in practice.

Values which are null or missing will be omitted from RSS output where
possible.
"""

import argparse
import codecs
import json
import locale
import logging
import re
import sys
import traceback

from collections.abc import Callable, Iterable, Sequence
from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, NotRequired, TypedDict, cast
from urllib.parse import urljoin, urlparse
from urllib.request import pathname2url, url2pathname
from xml.parsers.expat import ExpatError, ParserCreate
from xml.sax.saxutils import escape, quoteattr  # nosec

if TYPE_CHECKING:
    from io import TextIOBase

try:
    from argcomplete import autocomplete

    _HAVE_AUTOCOMPLETE = True
except ImportError:
    _HAVE_AUTOCOMPLETE = False

# Note: Must comply with https://peps.python.org/pep-0440/
__version__ = '0.1.0'

__all__ = [
    'YtdlEntry',
    'YtdlFormat',
    'YtdlPlaylist',
    'entries_to_playlist',
    'entry_to_rss',
    'get_entry_media_type',
    'main',
    'playlist_to_rss',
]

_JSON_PATH_KEY = object()
# User-visible program name.
# Must be stable and reliable.  Not deduced from __name__, __file__, or argv[0].
_PROG_NAME = 'ytdl2rss'
_VERSION_MESSAGE = (
    '%(prog)s '
    + __version__
    + """

%(prog) is free and unencumbered software released into the public domain.

%(prog) is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the Unlicense for details."""
)
_XML_TAG_RE = re.compile('<[^>]+>')

_logger = logging.getLogger(__name__)


[docs] class YtdlFormat(TypedDict): """ Type of formats in JSON produced by --write-info-json for a video. Note: Only includes attributes used in this script. """ ext: str acodec: NotRequired[str | None] vcodec: NotRequired[str | None] filesize: NotRequired[int]
[docs] class YtdlEntry(YtdlFormat): """ Type of JSON produced by --write-info-json for a video. Note: Only includes attributes used in this script. """ id: str webpage_url: str title: str upload_date: str duration: NotRequired[int | float] age_limit: int description: str formats: list[YtdlFormat] thumbnail: str fulltitle: str _filename: NotRequired[str]
[docs] class YtdlPlaylist(TypedDict): """ Type of JSON produced by --write-info-json for a playlist. Note: Only includes attributes used in this script. """ entries: list[YtdlEntry]
def _to_xml_text(text_or_html: str) -> str: """ Convert input text/HTML into XML text content for use in RSS. Parsers are inconsistent about whether title/description is HTML or plain text. Golden files includes examples of both. To handle these cases, escape only where necessary to avoid double-escaping entities. FIXME: Should escape based on extractor used, so that HTML-like strings in plain text titles are escaped and not treated as HTML. :param text_or_html: Text to escape, if not already valid XML. :return: ``text_or_xml``, XML-escaped if necessary. """ if '&' not in text_or_html and '<' not in text_or_html: # No characters which require escaping in fragment return text_or_html parser = ParserCreate() try: parser.Parse(f'<root>{text_or_html}</root>', True) # noqa: FBT003 except ExpatError: # text_or_html is not a valid XML fragment. Escape it. return escape(text_or_html) # text_or_html is a valid XML fragment. # strip tags, which are not allowed in title: # https://validator.w3.org/feed/docs/warning/ContainsHTML.html return _XML_TAG_RE.sub('', text_or_html) def _resolve_path( path: str, src_path: str, dst_path: str | None = None, dst_url: str | None = None, ) -> str: """Resolve a path in src_path to a URL in dst_path served at dst_url.""" src_dir = Path(src_path).parent cur_path = src_dir / path dst_dir = Path(dst_path or '.').parent rel_path = cur_path.relative_to(dst_dir) rel_url = pathname2url(rel_path.as_posix()) return urljoin(dst_url, rel_url) if dst_url else rel_url def _resolve_url( url: str, src_path: str, dst_path: str | None = None, dst_url: str | None = None, ) -> str: """Resolve a URL in src_path to a URL in dst_path served at dst_url.""" url_parts = urlparse(url) if url_parts.scheme: # url is absolute return url if url_parts.netloc: # url is scheme-relative if not dst_url: raise ValueError("Can't resolve scheme-relative URL without base") return urljoin(dst_url, url) # Resolve url from containing file url_path = url2pathname(url) return _resolve_path(url_path, src_path, dst_path, dst_url) def _ymd_to_rfc2822(datestr: str) -> str: """Convert a date in YYYYMMDD format to RFC 2822 for RSS.""" return format_datetime(datetime.strptime(datestr, '%Y%m%d')) # noqa: DTZ007 # pylint: disable-next=too-many-branches def _get_base_media_type( # noqa: C901, PLR0912 ext: str, acodec: str | None, vcodec: str | None ) -> str: """Get media type, without parameters, from youtube-dl JSON entry info.""" media_type = 'audio/' if acodec and not vcodec else 'video/' if ext == '3g2': media_type += '3gpp2' elif ext == '3gp': media_type += '3gpp' elif ext == 'avi': media_type = 'video/vnd.avi' elif ext in ( 'f4a', 'f4b', 'f4p', 'm4a', 'm4b', 'm4p', 'm4r', ): # These extensions are intended for audio. # If codecs are not known, assume it is audio. if not acodec and not vcodec: media_type = 'audio/mp4' else: media_type += 'mp4' elif ext in ('f4v', 'm4v'): media_type += 'mp4' elif ext == 'flv': media_type = 'video/x-flv' elif ext == 'gif': media_type = 'image/gif' elif ext in ('mk3d', 'mks', 'mkv'): media_type += 'x-matroska' elif ext == 'mka': # This extension is intended for audio. # If codecs are not known, assume it is audio. if not acodec and not vcodec: media_type = 'audio/' media_type += 'x-matroska' elif ext == 'mp3': media_type = 'audio/mpeg' elif ext == 'ogg': # Xiph recommends this extension for (vorbis) audio and ogv for video. # If video codec not known, assume it is audio. if not vcodec: media_type = 'audio/' media_type += 'ogg' elif ext == 'ogv': media_type += 'ogg' elif ext == 'wav': media_type = 'audio/vnd.wave' else: media_type += ext return media_type
[docs] def get_entry_media_type(entry: YtdlFormat) -> str: """ Get media type (i.e. MIME type) from youtube-dl JSON entry info. :param entry: Entry or format for which to get a media type. :return: Media type suitable for ``entry`` """ ext = entry['ext'] acodec = entry.get('acodec') if acodec == 'none': acodec = None vcodec = entry.get('vcodec') if vcodec == 'none': vcodec = None if ext == 'opus': # Note: ext: opus could be used to refer to "raw" audio/opus. # However, this has not been observed on ytdl-supported sites. # Since Xiph recommends .opus for Opus-in-Ogg # https://wiki.xiph.org/index.php/MIMETypesCodecs # and the ytdl extractor for media.ccc.de uses it this way, # unconditionally convert to ogg. # If uses of audio/opus are found, consider how to differentiate. ext = 'ogg' if acodec is None: acodec = 'opus' media_type = _get_base_media_type(ext, acodec, vcodec) # Add codecs parameter from https://tools.ietf.org/html/rfc6381 if (acodec or vcodec) and ext not in ('flv', 'gif', 'mp3'): # Some extractors (e.g. media.ccc.de) use vcodec: h264 # Section 3.3 of RFC 6381 specifies codecs must be a FOURCC if vcodec == 'h264': vcodec = 'avc1' # Note: Add space after ; as in RFC 6381 section 3.6 Examples media_type += '; codecs=' if acodec and vcodec: # Note: Add space after , as in RFC 6381 section 3.6 Examples # TODO: Apply encoding from RFC 2231 if required, see examples # in RFC 6381 section 3.1 media_type += '"' + vcodec + ', ' + acodec + '"' else: media_type += cast('str', acodec or vcodec) return media_type
def _guess_entry_filename(entry: YtdlEntry) -> str: """ Guess the file name to which youtube-dl would download a JSON entry. :param entry: Entry for which to guess the file name. :return: A file name to which ``entry`` would be saved by youtube-dl. """ return f'{entry["fulltitle"]}-{entry["id"]}.{entry["ext"]}' def _write_explicit_for_age_limit( write: Callable[[str], Any], age_limit: int, ) -> None: """ Write an appropriate <itunes:explicit> tag based on age_limit. Currently this function considers any age limit to be explicit. Standards have differed over valid itunes:explicit values: - Spotify 1.6 wanted yes/no/clean for item, yes/clean for channel. - Spotify 1.10 wants clean/yes/no/true/false. - Google wanted yes or absent. - Apple wants true/false, - W3C Feed Validator wanted yes/no/clean, now true/false - https://github.com/w3c/feedvalidator/issues/112 :param write: Function called to write RSS data. :param age_limit: Age limit from youtube-dl info. """ write('<itunes:explicit>') # Note: newgrounds has 13 (t/teen/PG-13) and 17 (m/mature/R) # Both probably qualify as explicit on iTunes write('true' if age_limit > 0 else 'false') write('</itunes:explicit>') # pylint: disable-next=too-many-branches,too-many-locals,too-many-statements
[docs] def entry_to_rss( entry: YtdlEntry, write: Callable[[str], Any], rss_url: str | None = None, rss_path: str | None = None, indent: str | None = None, ) -> None: """ Convert youtube-dl entry info object to podcast RSS. :param entry: Entry for which to generate RSS. :param write: Function called to write RSS data. :param rss_url: URL of RSS file being written. :param rss_path: Path to RSS file being written. :param indent: Indent to apply to each nesting level of RSS. """ if indent is None: indent2 = '' indent3 = '' eol = '' else: indent2 = indent * 2 indent3 = indent * 3 eol = '\n' json_path = entry[_JSON_PATH_KEY] # type: ignore[literal-required] write(indent2) write('<item>') write(eol) webpage_url = entry.get('webpage_url') if webpage_url: write(indent3) write('<guid isPermaLink="true">') write(escape(webpage_url)) write('</guid>') write(eol) else: write(indent3) write('<guid>') write(escape(entry['id'])) write('</guid>') write(eol) title = entry.get('title') if isinstance(title, str): write(indent3) write('<title>') write(_to_xml_text(title)) write('</title>') write(eol) upload_date = entry.get('upload_date') if isinstance(upload_date, str): write(indent3) write('<pubDate>') write(_ymd_to_rfc2822(upload_date)) write('</pubDate>') write(eol) filename = entry.get('_filename') or _guess_entry_filename(entry) fileurl = _resolve_path(filename, json_path, rss_path, rss_url) filesize = entry.get('filesize') media_type = get_entry_media_type(entry) write(indent3) write('<enclosure') if media_type is not None: write(' type=') write(quoteattr(media_type)) if filesize is not None: write(' length=') write(quoteattr(str(filesize))) write(' url=') write(quoteattr(fileurl)) write('/>') write(eol) thumbnail = entry.get('thumbnail') if isinstance(thumbnail, str): thumbnail = _resolve_url(thumbnail, json_path, rss_path, rss_url) write(indent3) write('<itunes:image href=') write(quoteattr(thumbnail)) write('/>') write(eol) duration = entry.get('duration') if isinstance(duration, float): # W3C Feed Validation Service complains about fractional duration: # https://validator.w3.org/feed/docs/error/InvalidDuration.html duration = round(duration) if isinstance(duration, int): write(indent3) write('<itunes:duration>') # Spotify: "Different duration formats are accepted however it is # recommended to convert the length of the episode into seconds." write(str(duration)) write('</itunes:duration>') write(eol) age_limit = entry.get('age_limit') if isinstance(age_limit, int): write(indent3) _write_explicit_for_age_limit(write, age_limit) write(eol) # TODO: <itunes:order> from autonumber (not in .info.json) # or playlist_index (may not be relevant/sequential for single file) # or sorted order? description = entry.get('description') if isinstance(description, str): write(indent3) write('<description>') # Note: HTML is allowed in item-level descriptions: # https://cyber.harvard.edu/rss/encodingDescriptions.html write(escape(description)) write('</description>') write(eol) write(indent2) write('</item>') write(eol)
def _playlist_to_rss_language( playlist: YtdlPlaylist, write: Callable[[str], Any], indent: str | None = None, ) -> None: """ Write language RSS tag for youtube-dl playlist info object. :param playlist: Playlist for which to generate RSS. :param write: Function called to write RSS data. :param indent: Indent to apply to each nesting level of RSS. """ if indent is None: indent2 = '' eol = '' else: indent2 = indent * 2 eol = '\n' languages = { entry.get('language') for entry in playlist['entries'] if entry.get('language') is not None } if len(languages) == 1: for language in languages: if isinstance(language, str): write(indent2) write('<language>') write(language) write('</language>') write(eol) # pylint: disable-next=too-many-branches,too-many-locals,too-many-statements
[docs] def playlist_to_rss( playlist: YtdlPlaylist, write: Callable[[str], Any], rss_url: str | None = None, rss_path: str | None = None, indent: str | None = None, ) -> None: """ Convert youtube-dl playlist info object to podcast RSS. :param playlist: Playlist for which to generate RSS. :param write: Function called to write RSS data. :param rss_url: URL of RSS file being written. :param rss_path: Path to RSS file being written. :param indent: Indent to apply to each nesting level of RSS. """ if indent is None: indent1 = '' indent2 = '' indent3 = '' eol = '' else: indent1 = indent indent2 = indent * 2 indent3 = indent * 3 eol = '\n' json_path = playlist.get(_JSON_PATH_KEY) # type: ignore[call-overload] write( '<rss version="2.0"' ' xmlns:atom="http://www.w3.org/2005/Atom"' ' xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd"' '>' ) write(eol) write(indent1) write('<channel>') write(eol) title = playlist.get('title') if isinstance(title, str): write(indent2) write('<title>') title_xml = _to_xml_text(title) write(title_xml) write('</title>') write(eol) # Not produced by youtube-dl: description = playlist.get('description') if isinstance(description, str): write(indent2) write('<description>') # Note: Although HTML is allowed in item-level descriptions, W3C Feed # Validation warns it is not allowed in channel-level descriptions. write(_to_xml_text(description)) write('</description>') write(eol) uploader = playlist.get('uploader') if isinstance(uploader, str): write(indent2) write('<itunes:author>') write(escape(uploader)) write('</itunes:author>') write(eol) webpage_url = playlist.get('webpage_url') if isinstance(webpage_url, str): write(indent2) write('<link>') webpage_url_xml = escape(webpage_url) write(webpage_url_xml) write('</link>') write(eol) upload_date = playlist.get('upload_date') if upload_date is None: upload_date = max( entry.get('upload_date') for entry in playlist['entries'] if entry ) if isinstance(upload_date, str): write(indent2) write('<pubDate>') write(_ymd_to_rfc2822(upload_date)) write('</pubDate>') write(eol) # Not produced by youtube-dl: # https://github.com/ytdl-org/youtube-dl/issues/16130 thumbnail = playlist.get('thumbnail') if isinstance(thumbnail, str): thumbnail = _resolve_url(thumbnail, json_path, rss_path, rss_url) write(indent2) write('<image>') write(eol) write(indent3) write('<url>') write(escape(thumbnail)) write('</url>') write(eol) # "Note, in practice the image <title> and <link> should have the # same value as the channel's <title> and <link>." # https://www.rssboard.org/rss-specification#ltimagegtSubelementOfLtchannelgt if isinstance(title, str): write(indent3) write('<title>') write(title_xml) write('</title>') write(eol) if isinstance(webpage_url, str): write(indent3) write('<link>') write(webpage_url_xml) write('</link>') write(eol) write(indent2) write('</image>') write(eol) # Apple instructs podcasters to use <itunes:image>, doesn't document # standardized <image>. Include both. write(indent2) write('<itunes:image href=') write(quoteattr(thumbnail)) write('/>') write(eol) _playlist_to_rss_language(playlist, write, indent) age_limits = [entry.get('age_limit') for entry in playlist['entries']] if age_limits and None not in age_limits: write(indent2) _write_explicit_for_age_limit(write, max(age_limits)) write(eol) # Provide self link, as recommended # https://validator.w3.org/feed/docs/warning/MissingAtomSelfLink.html if rss_url: write(indent2) write('<atom:link rel="self" type="application/rss+xml" href=') write(quoteattr(rss_url)) write('/>') write(eol) write(indent2) write('<generator>') write(escape(_PROG_NAME + ' ' + __version__)) write('</generator>') write(eol) for entry in playlist['entries']: entry_to_rss(entry, write, rss_url, rss_path, indent=indent) write(indent1) write('</channel>') write(eol) write('</rss>\n')
def _load_json(json_path: str) -> Any: # noqa: ANN401 """Load JSON from a file with a given path.""" # Note: Binary so load can detect encoding (as in Section 3 of RFC 4627) with open(json_path, 'rb') as json_file: try: return json.load(json_file) except ValueError as ex: raise ValueError('Error loading ' + json_path) from ex
[docs] def entries_to_playlist(entries: list[YtdlEntry]) -> YtdlPlaylist: """ Combine youtube-dl entries into a playlist with common metadata. :param entries: Entries to combine into a playlist. :return: A playlist with entries from ``entries``. """ # entry playlist metadata keys keys = { 'playlist_id', 'playlist_title', 'playlist_uploader', 'playlist_uploader_id', } # get playlist metadata, if same for all entries entries_playlist = None for entry in entries: entry_playlist = {k: v for k, v in entry.items() if v and k in keys} if entry_playlist: if entries_playlist is None: entries_playlist = entry_playlist elif entry_playlist != entries_playlist: # playlist metadata differs between entries entries_playlist = None break if entries_playlist: # Chop "playlist_" from entry playlist keys for use as playlist keys playlist = {k[9:]: v for k, v in entries_playlist.items()} else: playlist = {} playlist['_type'] = 'playlist' playlist['entries'] = entries return cast('YtdlPlaylist', playlist)
def _load_info(info_paths: Iterable[str]) -> YtdlPlaylist: """Load youtube-dl JSON info files into a single playlist object.""" entries: list[YtdlEntry] = [] info_count = 0 last_playlist: YtdlPlaylist | None = None for info_path in info_paths: info_count += 1 if info_path == '-': info = json.load(sys.stdin) else: info = _load_json(info_path) info_entries = info.get('entries') has_entries = isinstance(info_entries, list) has_formats = isinstance(info.get('formats'), list) if has_entries == has_formats: raise ValueError('Unrecognized JSON in ' + info_path) if has_formats: # info for a single video info[_JSON_PATH_KEY] = info_path entries.append(info) else: # info for a playlist last_playlist = info info[_JSON_PATH_KEY] = info_path for entry in info_entries: entry[_JSON_PATH_KEY] = info_path entries.extend(info_entries) # If the user provided a single playlist, use it as-is # This lets users easily specify whatever metadata they'd like if info_count == 1 and last_playlist: return last_playlist return entries_to_playlist(entries) # pylint: disable-next=too-many-branches def info_to_rss( info_paths: Iterable[str], rss_url: str | None = None, rss_path: str | None = None, indent: str | None = None, write: Callable[[str], Any] | None = None, ) -> None: """ Convert youtube-dl info JSON files to podcast RSS. :param info_paths: Path to youtube-dl info JSON files. :param rss_url: URL of RSS file being written. :param rss_path: Path of RSS file to produce. :param indent: Indent to apply to each nesting level of RSS. :param write: Function called to write RSS data, instead of using rss_path. :raises ValueError: if ``rss_path`` and ``sys.stdout`` are ``None`` """ if not rss_url or not urlparse(rss_url).scheme: # Note: Not just a spec compliance issue. Affects real aggregators: # https://github.com/AntennaPod/AntennaPod/issues/2880 _logger.warning( 'URLs in RSS 2.0 must be absolute (i.e. start with a scheme) per:\n' '- https://www.rssboard.org/rss-specification#comments\n' '- https://cyber.harvard.edu/rss/rss.html#comments\n' '- https://validator.w3.org/feed/docs/error/InvalidURLAttribute.html\n' 'The provided self URL (%s) is not.', rss_url, ) # Note: Could use default locale.getpreferredencoding(). Many users would # "prefer" ISO-8859-1. UTF-8 is a safer default to support more characters # and for wider podcast distributor/aggregator support. # (e.g. Apple instructs podcasters to use UTF-8.) encoding = 'UTF-8' output: TextIOBase | None = None if write: pass elif rss_path: # pylint: disable-next=consider-using-with output = open(rss_path, 'w', encoding=encoding) # noqa: SIM115 write = output.write elif sys.stdout is None: raise ValueError('stdout is closed') elif sys.stdout.isatty(): # TTY unlikely to interpret XML declaration. Use Python's encoding. if sys.stdout.encoding is not None: # pylint: disable-next=redefined-variable-type encoding = sys.stdout.encoding write = sys.stdout.write else: encoding = locale.getpreferredencoding() write = codecs.getwriter(encoding)(sys.stdout).write elif sys.stdout.encoding and sys.stdout.encoding.upper() == encoding: write = sys.stdout.write elif hasattr(sys.stdout, 'buffer'): write = codecs.getwriter(encoding)(sys.stdout.buffer).write else: write = codecs.getwriter(encoding)(cast('IO[bytes]', sys.stdout)).write try: write('<?xml version="1.0" encoding=') write(quoteattr(encoding)) write('?>') if indent is not None: write('\n') playlist_to_rss( _load_info(info_paths), write, rss_url, rss_path, indent=indent, ) finally: if output: output.close() def _parse_indent(indent: str | int) -> str: """Parse indent argument to indent string.""" try: return ' ' * int(indent) except ValueError: return cast('str', indent) def _build_argument_parser( **kwargs: Any, # noqa: ANN401 ) -> argparse.ArgumentParser: """ Build parser for command line options. :return: argument parser """ parser = argparse.ArgumentParser( usage='%(prog)s [options] <JSON file...>', description=__doc__, # Use raw formatter to avoid mangling version text formatter_class=argparse.RawDescriptionHelpFormatter, **kwargs, ) parser.add_argument( '-i', '--indent', help='XML indent string, or number of spaces to indent', nargs='?', type=_parse_indent, ) parser.add_argument( '-o', '--output', help='Output RSS file.', ) parser.add_argument( '-q', '--quiet', action='count', help='Decrease verbosity (less detailed output)', ) parser.add_argument( '-S', '--self-url', help='URL of generated RSS, to resolve relative URLs', ) parser.add_argument( '-v', '--verbose', action='count', help='Increase verbosity (more detailed output)', ) parser.add_argument( '-V', '--version', action='version', help='Output version and license information', version=_VERSION_MESSAGE, ) parser.add_argument( 'json_files', nargs='+', metavar='JSON file...', help='youtube-dl .info.json files', ) return parser
[docs] def main(argv: Sequence[str] = sys.argv) -> int: """ Entry point for command-line use. :param argv: command-line arguments (usually :py:data:`sys.argv`) :return: exit code """ parser = _build_argument_parser( prog=_PROG_NAME, ) if _HAVE_AUTOCOMPLETE: exit_code = None def exit_method(code: int = 0) -> None: nonlocal exit_code exit_code = code autocomplete(parser, exit_method=exit_method) if exit_code is not None: return exit_code args = parser.parse_args(args=argv[1:]) # Set log level based on verbosity requested (default of INFO) verbosity = (args.quiet or 0) - (args.verbose or 0) logging.basicConfig( format='%(levelname)s: %(message)s', level=logging.INFO + verbosity * 10 ) # Log version to aid debugging _logger.debug('ytdl2rss %s', __version__) try: info_to_rss( args.json_files, args.self_url, rss_path=args.output, indent=args.indent, ) except UnicodeEncodeError: # TODO: Should use a proper XML writer which would represent # characters outside the file encoding using XML entities. traceback.print_exc() sys.stderr.write( 'Consider specifying a different encoding in PYTHONIOENCODING.\n' ) return 1 return 0
if __name__ == '__main__': sys.exit(main())