#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
"""
Create podcast RSS from youtube-dl info JSON.
The info JSON produced by youtube-dl is not formally specified, although
attempts have been made to do so, such as
https://github.com/ytdl-org/youtube-dl/pull/21822
Functions in this module expect inputs to follow this proposed schema, although
reasonable attempts will be made to accommodate files encountered in practice.
Values which are null or missing will be omitted from RSS output where
possible.
"""
import argparse
import codecs
import json
import locale
import logging
import re
import sys
import traceback
from collections.abc import Callable, Iterable, Sequence
from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, NotRequired, TypedDict, cast
from urllib.parse import urljoin, urlparse
from urllib.request import pathname2url, url2pathname
from xml.parsers.expat import ExpatError, ParserCreate
from xml.sax.saxutils import escape, quoteattr # nosec
if TYPE_CHECKING:
from io import TextIOBase
try:
from argcomplete import autocomplete
_HAVE_AUTOCOMPLETE = True
except ImportError:
_HAVE_AUTOCOMPLETE = False
# Note: Must comply with https://peps.python.org/pep-0440/
__version__ = '0.1.0'
__all__ = [
'YtdlEntry',
'YtdlFormat',
'YtdlPlaylist',
'entries_to_playlist',
'entry_to_rss',
'get_entry_media_type',
'main',
'playlist_to_rss',
]
_JSON_PATH_KEY = object()
# User-visible program name.
# Must be stable and reliable. Not deduced from __name__, __file__, or argv[0].
_PROG_NAME = 'ytdl2rss'
_VERSION_MESSAGE = (
'%(prog)s '
+ __version__
+ """
%(prog) is free and unencumbered software released into the public domain.
%(prog) is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the Unlicense for details."""
)
_XML_TAG_RE = re.compile('<[^>]+>')
_logger = logging.getLogger(__name__)
[docs]
class YtdlEntry(YtdlFormat):
"""
Type of JSON produced by --write-info-json for a video.
Note: Only includes attributes used in this script.
"""
id: str
webpage_url: str
title: str
upload_date: str
duration: NotRequired[int | float]
age_limit: int
description: str
formats: list[YtdlFormat]
thumbnail: str
fulltitle: str
_filename: NotRequired[str]
[docs]
class YtdlPlaylist(TypedDict):
"""
Type of JSON produced by --write-info-json for a playlist.
Note: Only includes attributes used in this script.
"""
entries: list[YtdlEntry]
def _to_xml_text(text_or_html: str) -> str:
"""
Convert input text/HTML into XML text content for use in RSS.
Parsers are inconsistent about whether title/description is HTML or plain
text. Golden files includes examples of both. To handle these cases,
escape only where necessary to avoid double-escaping entities.
FIXME: Should escape based on extractor used, so that HTML-like strings in
plain text titles are escaped and not treated as HTML.
:param text_or_html: Text to escape, if not already valid XML.
:return: ``text_or_xml``, XML-escaped if necessary.
"""
if '&' not in text_or_html and '<' not in text_or_html:
# No characters which require escaping in fragment
return text_or_html
parser = ParserCreate()
try:
parser.Parse(f'<root>{text_or_html}</root>', True) # noqa: FBT003
except ExpatError:
# text_or_html is not a valid XML fragment. Escape it.
return escape(text_or_html)
# text_or_html is a valid XML fragment.
# strip tags, which are not allowed in title:
# https://validator.w3.org/feed/docs/warning/ContainsHTML.html
return _XML_TAG_RE.sub('', text_or_html)
def _resolve_path(
path: str,
src_path: str,
dst_path: str | None = None,
dst_url: str | None = None,
) -> str:
"""Resolve a path in src_path to a URL in dst_path served at dst_url."""
src_dir = Path(src_path).parent
cur_path = src_dir / path
dst_dir = Path(dst_path or '.').parent
rel_path = cur_path.relative_to(dst_dir)
rel_url = pathname2url(rel_path.as_posix())
return urljoin(dst_url, rel_url) if dst_url else rel_url
def _resolve_url(
url: str,
src_path: str,
dst_path: str | None = None,
dst_url: str | None = None,
) -> str:
"""Resolve a URL in src_path to a URL in dst_path served at dst_url."""
url_parts = urlparse(url)
if url_parts.scheme:
# url is absolute
return url
if url_parts.netloc:
# url is scheme-relative
if not dst_url:
raise ValueError("Can't resolve scheme-relative URL without base")
return urljoin(dst_url, url)
# Resolve url from containing file
url_path = url2pathname(url)
return _resolve_path(url_path, src_path, dst_path, dst_url)
def _ymd_to_rfc2822(datestr: str) -> str:
"""Convert a date in YYYYMMDD format to RFC 2822 for RSS."""
return format_datetime(datetime.strptime(datestr, '%Y%m%d')) # noqa: DTZ007
# pylint: disable-next=too-many-branches
def _get_base_media_type( # noqa: C901, PLR0912
ext: str, acodec: str | None, vcodec: str | None
) -> str:
"""Get media type, without parameters, from youtube-dl JSON entry info."""
media_type = 'audio/' if acodec and not vcodec else 'video/'
if ext == '3g2':
media_type += '3gpp2'
elif ext == '3gp':
media_type += '3gpp'
elif ext == 'avi':
media_type = 'video/vnd.avi'
elif ext in (
'f4a',
'f4b',
'f4p',
'm4a',
'm4b',
'm4p',
'm4r',
):
# These extensions are intended for audio.
# If codecs are not known, assume it is audio.
if not acodec and not vcodec:
media_type = 'audio/mp4'
else:
media_type += 'mp4'
elif ext in ('f4v', 'm4v'):
media_type += 'mp4'
elif ext == 'flv':
media_type = 'video/x-flv'
elif ext == 'gif':
media_type = 'image/gif'
elif ext in ('mk3d', 'mks', 'mkv'):
media_type += 'x-matroska'
elif ext == 'mka':
# This extension is intended for audio.
# If codecs are not known, assume it is audio.
if not acodec and not vcodec:
media_type = 'audio/'
media_type += 'x-matroska'
elif ext == 'mp3':
media_type = 'audio/mpeg'
elif ext == 'ogg':
# Xiph recommends this extension for (vorbis) audio and ogv for video.
# If video codec not known, assume it is audio.
if not vcodec:
media_type = 'audio/'
media_type += 'ogg'
elif ext == 'ogv':
media_type += 'ogg'
elif ext == 'wav':
media_type = 'audio/vnd.wave'
else:
media_type += ext
return media_type
[docs]
def get_entry_media_type(entry: YtdlFormat) -> str:
"""
Get media type (i.e. MIME type) from youtube-dl JSON entry info.
:param entry: Entry or format for which to get a media type.
:return: Media type suitable for ``entry``
"""
ext = entry['ext']
acodec = entry.get('acodec')
if acodec == 'none':
acodec = None
vcodec = entry.get('vcodec')
if vcodec == 'none':
vcodec = None
if ext == 'opus':
# Note: ext: opus could be used to refer to "raw" audio/opus.
# However, this has not been observed on ytdl-supported sites.
# Since Xiph recommends .opus for Opus-in-Ogg
# https://wiki.xiph.org/index.php/MIMETypesCodecs
# and the ytdl extractor for media.ccc.de uses it this way,
# unconditionally convert to ogg.
# If uses of audio/opus are found, consider how to differentiate.
ext = 'ogg'
if acodec is None:
acodec = 'opus'
media_type = _get_base_media_type(ext, acodec, vcodec)
# Add codecs parameter from https://tools.ietf.org/html/rfc6381
if (acodec or vcodec) and ext not in ('flv', 'gif', 'mp3'):
# Some extractors (e.g. media.ccc.de) use vcodec: h264
# Section 3.3 of RFC 6381 specifies codecs must be a FOURCC
if vcodec == 'h264':
vcodec = 'avc1'
# Note: Add space after ; as in RFC 6381 section 3.6 Examples
media_type += '; codecs='
if acodec and vcodec:
# Note: Add space after , as in RFC 6381 section 3.6 Examples
# TODO: Apply encoding from RFC 2231 if required, see examples
# in RFC 6381 section 3.1
media_type += '"' + vcodec + ', ' + acodec + '"'
else:
media_type += cast('str', acodec or vcodec)
return media_type
def _guess_entry_filename(entry: YtdlEntry) -> str:
"""
Guess the file name to which youtube-dl would download a JSON entry.
:param entry: Entry for which to guess the file name.
:return: A file name to which ``entry`` would be saved by youtube-dl.
"""
return f'{entry["fulltitle"]}-{entry["id"]}.{entry["ext"]}'
def _write_explicit_for_age_limit(
write: Callable[[str], Any],
age_limit: int,
) -> None:
"""
Write an appropriate <itunes:explicit> tag based on age_limit.
Currently this function considers any age limit to be explicit.
Standards have differed over valid itunes:explicit values:
- Spotify 1.6 wanted yes/no/clean for item, yes/clean for channel.
- Spotify 1.10 wants clean/yes/no/true/false.
- Google wanted yes or absent.
- Apple wants true/false,
- W3C Feed Validator wanted yes/no/clean, now true/false
- https://github.com/w3c/feedvalidator/issues/112
:param write: Function called to write RSS data.
:param age_limit: Age limit from youtube-dl info.
"""
write('<itunes:explicit>')
# Note: newgrounds has 13 (t/teen/PG-13) and 17 (m/mature/R)
# Both probably qualify as explicit on iTunes
write('true' if age_limit > 0 else 'false')
write('</itunes:explicit>')
# pylint: disable-next=too-many-branches,too-many-locals,too-many-statements
def _playlist_to_rss_language(
playlist: YtdlPlaylist,
write: Callable[[str], Any],
indent: str | None = None,
) -> None:
"""
Write language RSS tag for youtube-dl playlist info object.
:param playlist: Playlist for which to generate RSS.
:param write: Function called to write RSS data.
:param indent: Indent to apply to each nesting level of RSS.
"""
if indent is None:
indent2 = ''
eol = ''
else:
indent2 = indent * 2
eol = '\n'
languages = {
entry.get('language')
for entry in playlist['entries']
if entry.get('language') is not None
}
if len(languages) == 1:
for language in languages:
if isinstance(language, str):
write(indent2)
write('<language>')
write(language)
write('</language>')
write(eol)
# pylint: disable-next=too-many-branches,too-many-locals,too-many-statements
def _load_json(json_path: str) -> Any: # noqa: ANN401
"""Load JSON from a file with a given path."""
# Note: Binary so load can detect encoding (as in Section 3 of RFC 4627)
with open(json_path, 'rb') as json_file:
try:
return json.load(json_file)
except ValueError as ex:
raise ValueError('Error loading ' + json_path) from ex
[docs]
def entries_to_playlist(entries: list[YtdlEntry]) -> YtdlPlaylist:
"""
Combine youtube-dl entries into a playlist with common metadata.
:param entries: Entries to combine into a playlist.
:return: A playlist with entries from ``entries``.
"""
# entry playlist metadata keys
keys = {
'playlist_id',
'playlist_title',
'playlist_uploader',
'playlist_uploader_id',
}
# get playlist metadata, if same for all entries
entries_playlist = None
for entry in entries:
entry_playlist = {k: v for k, v in entry.items() if v and k in keys}
if entry_playlist:
if entries_playlist is None:
entries_playlist = entry_playlist
elif entry_playlist != entries_playlist:
# playlist metadata differs between entries
entries_playlist = None
break
if entries_playlist:
# Chop "playlist_" from entry playlist keys for use as playlist keys
playlist = {k[9:]: v for k, v in entries_playlist.items()}
else:
playlist = {}
playlist['_type'] = 'playlist'
playlist['entries'] = entries
return cast('YtdlPlaylist', playlist)
def _load_info(info_paths: Iterable[str]) -> YtdlPlaylist:
"""Load youtube-dl JSON info files into a single playlist object."""
entries: list[YtdlEntry] = []
info_count = 0
last_playlist: YtdlPlaylist | None = None
for info_path in info_paths:
info_count += 1
if info_path == '-':
info = json.load(sys.stdin)
else:
info = _load_json(info_path)
info_entries = info.get('entries')
has_entries = isinstance(info_entries, list)
has_formats = isinstance(info.get('formats'), list)
if has_entries == has_formats:
raise ValueError('Unrecognized JSON in ' + info_path)
if has_formats:
# info for a single video
info[_JSON_PATH_KEY] = info_path
entries.append(info)
else:
# info for a playlist
last_playlist = info
info[_JSON_PATH_KEY] = info_path
for entry in info_entries:
entry[_JSON_PATH_KEY] = info_path
entries.extend(info_entries)
# If the user provided a single playlist, use it as-is
# This lets users easily specify whatever metadata they'd like
if info_count == 1 and last_playlist:
return last_playlist
return entries_to_playlist(entries)
# pylint: disable-next=too-many-branches
def info_to_rss(
info_paths: Iterable[str],
rss_url: str | None = None,
rss_path: str | None = None,
indent: str | None = None,
write: Callable[[str], Any] | None = None,
) -> None:
"""
Convert youtube-dl info JSON files to podcast RSS.
:param info_paths: Path to youtube-dl info JSON files.
:param rss_url: URL of RSS file being written.
:param rss_path: Path of RSS file to produce.
:param indent: Indent to apply to each nesting level of RSS.
:param write: Function called to write RSS data, instead of using rss_path.
:raises ValueError: if ``rss_path`` and ``sys.stdout`` are ``None``
"""
if not rss_url or not urlparse(rss_url).scheme:
# Note: Not just a spec compliance issue. Affects real aggregators:
# https://github.com/AntennaPod/AntennaPod/issues/2880
_logger.warning(
'URLs in RSS 2.0 must be absolute (i.e. start with a scheme) per:\n'
'- https://www.rssboard.org/rss-specification#comments\n'
'- https://cyber.harvard.edu/rss/rss.html#comments\n'
'- https://validator.w3.org/feed/docs/error/InvalidURLAttribute.html\n'
'The provided self URL (%s) is not.',
rss_url,
)
# Note: Could use default locale.getpreferredencoding(). Many users would
# "prefer" ISO-8859-1. UTF-8 is a safer default to support more characters
# and for wider podcast distributor/aggregator support.
# (e.g. Apple instructs podcasters to use UTF-8.)
encoding = 'UTF-8'
output: TextIOBase | None = None
if write:
pass
elif rss_path:
# pylint: disable-next=consider-using-with
output = open(rss_path, 'w', encoding=encoding) # noqa: SIM115
write = output.write
elif sys.stdout is None:
raise ValueError('stdout is closed')
elif sys.stdout.isatty():
# TTY unlikely to interpret XML declaration. Use Python's encoding.
if sys.stdout.encoding is not None:
# pylint: disable-next=redefined-variable-type
encoding = sys.stdout.encoding
write = sys.stdout.write
else:
encoding = locale.getpreferredencoding()
write = codecs.getwriter(encoding)(sys.stdout).write
elif sys.stdout.encoding and sys.stdout.encoding.upper() == encoding:
write = sys.stdout.write
elif hasattr(sys.stdout, 'buffer'):
write = codecs.getwriter(encoding)(sys.stdout.buffer).write
else:
write = codecs.getwriter(encoding)(cast('IO[bytes]', sys.stdout)).write
try:
write('<?xml version="1.0" encoding=')
write(quoteattr(encoding))
write('?>')
if indent is not None:
write('\n')
playlist_to_rss(
_load_info(info_paths),
write,
rss_url,
rss_path,
indent=indent,
)
finally:
if output:
output.close()
def _parse_indent(indent: str | int) -> str:
"""Parse indent argument to indent string."""
try:
return ' ' * int(indent)
except ValueError:
return cast('str', indent)
def _build_argument_parser(
**kwargs: Any, # noqa: ANN401
) -> argparse.ArgumentParser:
"""
Build parser for command line options.
:return: argument parser
"""
parser = argparse.ArgumentParser(
usage='%(prog)s [options] <JSON file...>',
description=__doc__,
# Use raw formatter to avoid mangling version text
formatter_class=argparse.RawDescriptionHelpFormatter,
**kwargs,
)
parser.add_argument(
'-i',
'--indent',
help='XML indent string, or number of spaces to indent',
nargs='?',
type=_parse_indent,
)
parser.add_argument(
'-o',
'--output',
help='Output RSS file.',
)
parser.add_argument(
'-q',
'--quiet',
action='count',
help='Decrease verbosity (less detailed output)',
)
parser.add_argument(
'-S',
'--self-url',
help='URL of generated RSS, to resolve relative URLs',
)
parser.add_argument(
'-v',
'--verbose',
action='count',
help='Increase verbosity (more detailed output)',
)
parser.add_argument(
'-V',
'--version',
action='version',
help='Output version and license information',
version=_VERSION_MESSAGE,
)
parser.add_argument(
'json_files',
nargs='+',
metavar='JSON file...',
help='youtube-dl .info.json files',
)
return parser
[docs]
def main(argv: Sequence[str] = sys.argv) -> int:
"""
Entry point for command-line use.
:param argv: command-line arguments (usually :py:data:`sys.argv`)
:return: exit code
"""
parser = _build_argument_parser(
prog=_PROG_NAME,
)
if _HAVE_AUTOCOMPLETE:
exit_code = None
def exit_method(code: int = 0) -> None:
nonlocal exit_code
exit_code = code
autocomplete(parser, exit_method=exit_method)
if exit_code is not None:
return exit_code
args = parser.parse_args(args=argv[1:])
# Set log level based on verbosity requested (default of INFO)
verbosity = (args.quiet or 0) - (args.verbose or 0)
logging.basicConfig(
format='%(levelname)s: %(message)s', level=logging.INFO + verbosity * 10
)
# Log version to aid debugging
_logger.debug('ytdl2rss %s', __version__)
try:
info_to_rss(
args.json_files,
args.self_url,
rss_path=args.output,
indent=args.indent,
)
except UnicodeEncodeError:
# TODO: Should use a proper XML writer which would represent
# characters outside the file encoding using XML entities.
traceback.print_exc()
sys.stderr.write(
'Consider specifying a different encoding in PYTHONIOENCODING.\n'
)
return 1
return 0
if __name__ == '__main__':
sys.exit(main())