Audio changes (#5593)

* Squash tested commits

* remove the code jack is concerned about

* Apply suggestions from code review

* more log lines

* more log lines

* format

* formatting

* style(Rename Xms and Xmx mentions): Rename Xms and Xmx to more use friendly names

- Change Xms to "Initial Heapsize"
- Change Xmx to "Max Heapsize"

Signed-off-by: Draper <27962761+Drapersniper@users.noreply.github.com>
This commit is contained in:
Draper
2022-03-28 16:23:30 +01:00
committed by GitHub
parent 5a5b22003f
commit 9ec85d4819
28 changed files with 1629 additions and 599 deletions

View File

@@ -1,24 +1,52 @@
import asyncio
import asyncio.subprocess # disables for # https://github.com/PyCQA/pylint/issues/1469
import contextlib
import itertools
import json
import pathlib
import platform
import re
import shlex
import shutil
import tempfile
import time
from typing import ClassVar, Final, List, Optional, Pattern, Tuple
from typing import ClassVar, Final, List, Optional, Pattern, Tuple, TYPE_CHECKING
import aiohttp
import lavalink
import psutil
import rich.progress
import yaml
from discord.backoff import ExponentialBackoff
from red_commons.logging import getLogger
from redbot.core import data_manager
from redbot.core import data_manager, Config
from redbot.core.i18n import Translator
from .errors import LavalinkDownloadFailed
from .utils import task_callback_exception
from .errors import (
LavalinkDownloadFailed,
InvalidArchitectureException,
ManagedLavalinkAlreadyRunningException,
ManagedLavalinkPreviouslyShutdownException,
UnsupportedJavaException,
ManagedLavalinkStartFailure,
UnexpectedJavaResponseException,
EarlyExitException,
ManagedLavalinkNodeException,
TooManyProcessFound,
IncorrectProcessFound,
NoProcessFound,
NodeUnhealthy,
)
from .utils import (
change_dict_naming_convention,
get_max_allocation_size,
replace_p_with_prefix,
)
from ...core.utils import AsyncIter
if TYPE_CHECKING:
from . import Audio
_ = Translator("Audio", pathlib.Path(__file__))
log = getLogger("red.Audio.manager")
@@ -31,7 +59,6 @@ LAVALINK_DOWNLOAD_URL: Final[str] = (
)
LAVALINK_DOWNLOAD_DIR: Final[pathlib.Path] = data_manager.cog_data_path(raw_name="Audio")
LAVALINK_JAR_FILE: Final[pathlib.Path] = LAVALINK_DOWNLOAD_DIR / "Lavalink.jar"
BUNDLED_APP_YML: Final[pathlib.Path] = pathlib.Path(__file__).parent / "data" / "application.yml"
LAVALINK_APP_YML: Final[pathlib.Path] = LAVALINK_DOWNLOAD_DIR / "application.yml"
_RE_READY_LINE: Final[Pattern] = re.compile(rb"Started Launcher in \S+ seconds")
@@ -98,12 +125,16 @@ class ServerManager:
_buildtime: ClassVar[Optional[str]] = None
_java_exc: ClassVar[str] = "java"
def __init__(self) -> None:
def __init__(self, config: Config, cog: "Audio", timeout: Optional[int] = None) -> None:
self.ready: asyncio.Event = asyncio.Event()
self._config = config
self._proc: Optional[asyncio.subprocess.Process] = None # pylint:disable=no-member
self._monitor_task: Optional[asyncio.Task] = None
self._node_pid: Optional[int] = None
self._shutdown: bool = False
self.start_monitor_task = None
self.timeout = timeout
self.cog = cog
self._args = []
@property
def path(self) -> Optional[str]:
@@ -129,70 +160,122 @@ class ServerManager:
def build_time(self) -> Optional[str]:
return self._buildtime
async def start(self, java_path: str) -> None:
async def _start(self, java_path: str) -> None:
arch_name = platform.machine()
self._java_exc = java_path
if arch_name in self._blacklisted_archs:
raise asyncio.CancelledError(
"You are attempting to run Lavalink audio on an unsupported machine architecture."
raise InvalidArchitectureException(
"You are attempting to run the managed Lavalink node on an unsupported machine architecture."
)
if self._proc is not None:
if self._proc.returncode is None:
raise RuntimeError("Internal Lavalink server is already running")
raise ManagedLavalinkAlreadyRunningException(
"Managed Lavalink node is already running"
)
elif self._shutdown:
raise RuntimeError("Server manager has already been used - create another one")
raise ManagedLavalinkPreviouslyShutdownException(
"Server manager has already been used - create another one"
)
await self.process_settings()
await self.maybe_download_jar()
# Copy the application.yml across.
# For people to customise their Lavalink server configuration they need to run it
# externally
shutil.copyfile(BUNDLED_APP_YML, LAVALINK_APP_YML)
args = await self._get_jar_args()
self._proc = await asyncio.subprocess.create_subprocess_exec( # pylint:disable=no-member
*args,
cwd=str(LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
log.info("Internal Lavalink server started. PID: %s", self._proc.pid)
args, msg = await self._get_jar_args()
if msg is not None:
log.warning(msg)
command_string = shlex.join(args)
log.info("Managed Lavalink node startup command: %s", command_string)
if "-Xmx" not in command_string and msg is None:
log.warning(
await replace_p_with_prefix(
self.cog.bot,
"Managed Lavalink node maximum allowed RAM not set or higher than available RAM, "
"please use '[p]llset heapsize' to set a maximum value to avoid out of RAM crashes.",
)
)
try:
await asyncio.wait_for(self._wait_for_launcher(), timeout=120)
self._proc = (
await asyncio.subprocess.create_subprocess_exec( # pylint:disable=no-member
*args,
cwd=str(LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
)
self._node_pid = self._proc.pid
log.info("Managed Lavalink node started. PID: %s", self._node_pid)
try:
await asyncio.wait_for(self._wait_for_launcher(), timeout=self.timeout)
except asyncio.TimeoutError:
log.warning(
"Timeout occurred whilst waiting for managed Lavalink node to be ready"
)
raise
except asyncio.TimeoutError:
log.warning("Timeout occurred whilst waiting for internal Lavalink server to be ready")
await self._partial_shutdown()
except Exception:
await self._partial_shutdown()
raise
self._monitor_task = asyncio.create_task(self._monitor())
self._monitor_task.add_done_callback(task_callback_exception)
async def process_settings(self):
data = change_dict_naming_convention(await self._config.yaml.all())
with open(LAVALINK_APP_YML, "w") as f:
yaml.safe_dump(data, f)
async def _get_jar_args(self) -> List[str]:
async def _get_jar_args(self) -> Tuple[List[str], Optional[str]]:
(java_available, java_version) = await self._has_java()
if not java_available:
raise RuntimeError("You must install Java 11 for Lavalink to run.")
return [
if self._java_version is None:
extras = ""
else:
extras = f" however you have version {self._java_version} (executable: {self._java_exc})"
raise UnsupportedJavaException(
await replace_p_with_prefix(
self.cog.bot,
f"The managed Lavalink node requires Java 11 to run{extras};\n"
"Either install version 11 and restart the bot or connect to an external Lavalink node "
"(https://docs.discord.red/en/stable/install_guides/index.html)\n"
"If you already have Java 11 installed then then you will need to specify the executable path, "
"use '[p]llset java' to set the correct Java 11 executable.",
) # TODO: Replace with Audio docs when they are out
)
java_xms, java_xmx = list((await self._config.java.all()).values())
match = re.match(r"^(\d+)([MG])$", java_xmx, flags=re.IGNORECASE)
command_args = [
self._java_exc,
"-Djdk.tls.client.protocols=TLSv1.2",
"-jar",
str(LAVALINK_JAR_FILE),
f"-Xms{java_xms}",
]
meta = 0, None
invalid = None
if match and (
(int(match.group(1)) * 1024 ** (2 if match.group(2).lower() == "m" else 3))
< (meta := get_max_allocation_size(self._java_exc))[0]
):
command_args.append(f"-Xmx{java_xmx}")
elif meta[0] is not None:
invalid = await replace_p_with_prefix(
self.cog.bot,
"Managed Lavalink node RAM allocation ignored due to system limitations, "
"please fix this by setting the correct value with '[p]llset heapsize'.",
)
command_args.extend(["-jar", str(LAVALINK_JAR_FILE)])
self._args = command_args
return command_args, invalid
async def _has_java(self) -> Tuple[bool, Optional[Tuple[int, int]]]:
if self._java_available is not None:
if self._java_available:
# Return cached value if we've checked this before
return self._java_available, self._java_version
java_exec = shutil.which(self._java_exc)
java_available = java_exec is not None
if not java_available:
self.java_available = False
self.java_version = None
self._java_available = False
self._java_version = None
else:
self._java_version = version = await self._get_java_version()
self._java_available = (11, 0) <= version < (12, 0)
self._java_version = await self._get_java_version()
self._java_available = (11, 0) <= self._java_version < (12, 0)
self._java_exc = java_exec
return self._java_available, self._java_version
@@ -224,58 +307,49 @@ class ServerManager:
return major, minor
raise RuntimeError(f"The output of `{self._java_exc} -version` was unexpected.")
raise UnexpectedJavaResponseException(
f"The output of `{self._java_exc} -version` was unexpected\n{version_info}."
)
async def _wait_for_launcher(self) -> None:
log.debug("Waiting for Lavalink server to be ready")
lastmessage = 0
log.info("Waiting for Managed Lavalink node to be ready")
for i in itertools.cycle(range(50)):
line = await self._proc.stdout.readline()
if _RE_READY_LINE.search(line):
self.ready.set()
log.info("Internal Lavalink server is ready to receive requests.")
log.info("Managed Lavalink node is ready to receive requests.")
break
if _FAILED_TO_START.search(line):
raise RuntimeError(f"Lavalink failed to start: {line.decode().strip()}")
if self._proc.returncode is not None and lastmessage + 2 < time.time():
raise ManagedLavalinkStartFailure(
f"Lavalink failed to start: {line.decode().strip()}"
)
if self._proc.returncode is not None:
# Avoid Console spam only print once every 2 seconds
lastmessage = time.time()
log.critical("Internal lavalink server exited early")
raise EarlyExitException("Managed Lavalink node server exited early.")
if i == 49:
# Sleep after 50 lines to prevent busylooping
await asyncio.sleep(0.1)
async def _monitor(self) -> None:
while self._proc.returncode is None:
await asyncio.sleep(0.5)
# This task hasn't been cancelled - Lavalink was shut down by something else
log.warning("Internal Lavalink jar shutdown unexpectedly")
if not self._has_java_error():
log.info("Restarting internal Lavalink server")
await self.start(self._java_exc)
else:
log.critical(
"Your Java is borked. Please find the hs_err_pid%d.log file"
" in the Audio data folder and report this issue.",
self._proc.pid,
)
def _has_java_error(self) -> bool:
poss_error_file = LAVALINK_DOWNLOAD_DIR / "hs_err_pid{}.log".format(self._proc.pid)
return poss_error_file.exists()
async def shutdown(self) -> None:
if self._shutdown is True or self._proc is None:
if self.start_monitor_task is not None:
self.start_monitor_task.cancel()
await self._partial_shutdown()
async def _partial_shutdown(self) -> None:
self.ready.clear()
# In certain situations to await self._proc.wait() is invalid so waiting on it waits forever.
if self._shutdown is True:
# For convenience, calling this method more than once or calling it before starting it
# does nothing.
return
log.info("Shutting down internal Lavalink server")
if self._monitor_task is not None:
self._monitor_task.cancel()
self._proc.terminate()
await self._proc.wait()
if self._node_pid:
with contextlib.suppress(psutil.Error):
p = psutil.Process(self._node_pid)
p.terminate()
p.kill()
self._proc = None
self._shutdown = True
self._node_pid = None
async def _download_jar(self) -> None:
log.info("Downloading Lavalink.jar...")
@@ -327,7 +401,7 @@ class ServerManager:
if self._up_to_date is True:
# Return cached value if we've checked this before
return True
args = await self._get_jar_args()
args, _ = await self._get_jar_args()
args.append("--version")
_proc = await asyncio.subprocess.create_subprocess_exec( # pylint:disable=no-member
*args,
@@ -366,3 +440,118 @@ class ServerManager:
async def maybe_download_jar(self):
if not (LAVALINK_JAR_FILE.exists() and await self._is_up_to_date()):
await self._download_jar()
async def wait_until_ready(self, timeout: Optional[float] = None):
await asyncio.wait_for(self.ready.wait(), timeout=timeout or self.timeout)
async def start_monitor(self, java_path: str):
retry_count = 0
backoff = ExponentialBackoff(base=7)
while True:
try:
self._shutdown = False
if self._node_pid is None or not psutil.pid_exists(self._node_pid):
self.ready.clear()
await self._start(java_path=java_path)
while True:
await self.wait_until_ready(timeout=self.timeout)
if not psutil.pid_exists(self._node_pid):
raise NoProcessFound
try:
node = lavalink.get_all_nodes()[0]
if node.ready:
# Hoping this throws an exception which will then trigger a restart
await node._ws.ping()
backoff = ExponentialBackoff(
base=7
) # Reassign Backoff to reset it on successful ping.
# ExponentialBackoff.reset() would be a nice method to have
await asyncio.sleep(1)
else:
await asyncio.sleep(5)
except IndexError:
# In case lavalink.get_all_nodes() returns 0 Nodes
# (During a connect or multiple connect failures)
try:
log.debug(
"Managed node monitor detected RLL is not connected to any nodes"
)
await lavalink.wait_until_ready(timeout=60, wait_if_no_node=60)
except asyncio.TimeoutError:
self.cog.lavalink_restart_connect(manual=True)
return # lavalink_restart_connect will cause a new monitor task to be created.
except Exception as exc:
log.debug(exc, exc_info=exc)
raise NodeUnhealthy(str(exc))
except (TooManyProcessFound, IncorrectProcessFound, NoProcessFound):
await self._partial_shutdown()
except asyncio.TimeoutError:
delay = backoff.delay()
await self._partial_shutdown()
log.warning(
"Lavalink Managed node health check timeout, restarting in %s seconds",
delay,
)
await asyncio.sleep(delay)
except NodeUnhealthy:
delay = backoff.delay()
await self._partial_shutdown()
log.warning(
"Lavalink Managed node health check failed, restarting in %s seconds",
delay,
)
await asyncio.sleep(delay)
except LavalinkDownloadFailed as exc:
delay = backoff.delay()
if exc.should_retry:
log.warning(
"Lavalink Managed node download failed retrying in %s seconds\n%s",
delay,
exc.response,
)
retry_count += 1
await self._partial_shutdown()
await asyncio.sleep(delay)
else:
log.critical(
"Fatal exception whilst starting managed Lavalink node, "
"aborting...\n%s",
exc.response,
)
self.cog.lavalink_connection_aborted = True
return await self.shutdown()
except InvalidArchitectureException:
log.critical("Invalid machine architecture, cannot run a managed Lavalink node.")
self.cog.lavalink_connection_aborted = True
return await self.shutdown()
except (UnsupportedJavaException, UnexpectedJavaResponseException) as exc:
log.critical(exc)
self.cog.lavalink_connection_aborted = True
return await self.shutdown()
except ManagedLavalinkNodeException as exc:
delay = backoff.delay()
log.critical(
exc,
)
await self._partial_shutdown()
log.warning(
"Lavalink Managed node startup failed retrying in %s seconds",
delay,
)
await asyncio.sleep(delay)
except asyncio.CancelledError:
return
except Exception as exc:
delay = backoff.delay()
log.warning(
"Lavalink Managed node startup failed retrying in %s seconds",
delay,
)
log.debug(exc, exc_info=exc)
await self._partial_shutdown()
await asyncio.sleep(delay)
async def start(self, java_path: str):
if self.start_monitor_task is not None:
await self.shutdown()
self.start_monitor_task = asyncio.create_task(self.start_monitor(java_path))