Source code for mopidy.core._actor

# ruff: noqa: ARG002

from __future__ import annotations

import itertools
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING

import pykka
from pykka.typing import ActorMemberMixin, proxy_method

import mopidy
from mopidy import audio, backend, mixer
from mopidy._lib import paths
from mopidy.types import PlaybackState, UriScheme

from ._history import HistoryController
from ._library import LibraryController
from ._listener import CoreListener
from ._mixer import MixerController
from ._playback import PlaybackController
from ._playlists import PlaylistsController
from ._state_storage import CoreControllersState, StoredState
from ._tracklist import TracklistController

if TYPE_CHECKING:
    from mopidy.config import Config
    from mopidy.mixer import MixerProxy
    from mopidy.types import Uri

    from ._history import HistoryControllerProxy
    from ._library import LibraryControllerProxy
    from ._mixer import MixerControllerProxy
    from ._playback import PlaybackControllerProxy
    from ._playlists import PlaylistsControllerProxy
    from ._tracklist import TracklistControllerProxy

logger = logging.getLogger(__name__)


[docs] class Core( pykka.ThreadingActor, audio.AudioListener, backend.BackendListener, mixer.MixerListener, ): library: LibraryController """An instance of :class:`~mopidy.core.LibraryController`""" history: HistoryController """An instance of :class:`~mopidy.core.HistoryController`""" mixer: MixerController """An instance of :class:`~mopidy.core.MixerController`""" playback: PlaybackController """An instance of :class:`~mopidy.core.PlaybackController`""" playlists: PlaylistsController """An instance of :class:`~mopidy.core.PlaylistsController`""" tracklist: TracklistController """An instance of :class:`~mopidy.core.TracklistController`""" def __init__( self, config: Config, *, mixer: MixerProxy | None = None, backends: Iterable[backend.BackendProxy], audio: audio.AudioProxy | None = None, ) -> None: super().__init__() self._config = config self.backends = Backends(backends or []) self.library = pykka.traversable( LibraryController(backends=self.backends, core=self), ) self.history = pykka.traversable(HistoryController()) self.mixer = pykka.traversable(MixerController(mixer=mixer)) self.playback = pykka.traversable( PlaybackController(audio=audio, backends=self.backends, core=self), ) self.playlists = pykka.traversable( PlaylistsController(backends=self.backends, core=self), ) self.tracklist = pykka.traversable(TracklistController(core=self)) self.audio = audio
[docs] def get_uri_schemes(self) -> list[UriScheme]: """Get list of URI schemes we can handle.""" futures = [b.uri_schemes for b in self.backends] results = pykka.get_all(futures) uri_schemes = itertools.chain(*results) return sorted(uri_schemes)
[docs] def get_version(self) -> str: """Get version of the Mopidy core API.""" return mopidy.__version__
def reached_end_of_stream(self) -> None: self.playback._on_end_of_stream() def stream_changed(self, uri: Uri) -> None: self.playback._on_stream_changed(uri) def position_changed(self, position: int) -> None: self.playback._on_position_changed(position) def state_changed( self, old_state: PlaybackState, new_state: PlaybackState, target_state: PlaybackState | None, ) -> None: # NOTE: This is a temporary fix for issue #232 while we wait for a more # permanent solution with the implementation of issue #234. When the # Spotify play token is lost, the Spotify backend pauses audio # playback, but mopidy.core doesn't know this, so we need to update # mopidy.core's state to match the actual state in mopidy.audio. If we # don't do this, clients will think that we're still playing. # We ignore cases when target state is set as this is buffering # updates (at least for now) and we need to get #234 fixed... if ( new_state == PlaybackState.PAUSED and not target_state and self.playback.get_state() != PlaybackState.PAUSED ): self.playback.set_state(new_state) self.playback._trigger_track_playback_paused() def playlists_loaded(self) -> None: # Forward event from backend to frontends CoreListener.send("playlists_loaded") def volume_changed(self, volume: int) -> None: # Forward event from mixer to frontends CoreListener.send("volume_changed", volume=volume) def mute_changed(self, mute: bool) -> None: # Forward event from mixer to frontends CoreListener.send("mute_changed", mute=mute) def tags_changed(self, tags: set[str]) -> None: if not self.audio or "title" not in tags: return current_tags = self.audio.get_current_tags().get() if not current_tags: return self.playback._stream_title = None # TODO: Do not emit stream title changes for plain tracks. We need a # better way to decide if something is a stream. if current_tags.get("title"): title = current_tags["title"][0] current_track = self.playback.get_current_track() if current_track is not None and current_track.name != title: self.playback._stream_title = title CoreListener.send("stream_title_changed", title=title) def _setup(self) -> None: """Do not call this function. It is for internal use at startup.""" try: coverage = [] if ( self._config and "restore_state" in self._config["core"] and self._config["core"]["restore_state"] ): coverage = [ "tracklist", "mode", "play-last", "mixer", "history", ] if coverage: self._load_state(coverage) except Exception as e: # noqa: BLE001 logger.warning("Restore state: Unexpected error: %s", str(e)) def _teardown(self) -> None: """Do not call this function. It is for internal use at shutdown.""" try: if ( self._config and "restore_state" in self._config["core"] and self._config["core"]["restore_state"] ): self._save_state() except Exception as e: # noqa: BLE001 logger.warning("Unexpected error while saving state: %s", str(e)) def _get_data_dir(self) -> Path: # get or create data director for core data_dir_path = paths.expand_path(self._config["core"]["data_dir"]) / "core" paths.get_or_create_dir(data_dir_path) return data_dir_path def _get_state_file(self) -> Path: return self._get_data_dir() / "state.json.gz" def _save_state(self) -> None: """Save current state to disk.""" state_file = self._get_state_file() logger.info("Saving state to %s", state_file) state = StoredState( version=mopidy.__version__, state=CoreControllersState( tracklist=self.tracklist._save_state(), history=self.history._save_state(), playback=self.playback._save_state(), mixer=self.mixer._save_state(), ), ) state.dump(state_file) logger.debug("Saving state done") def _load_state(self, coverage: Iterable[str]) -> None: """Restore state from disk. Load state from disk and restore it. Parameter ``coverage`` limits the amount of data to restore. Possible values for ``coverage`` (list of one or more of): - 'tracklist' fill the tracklist - 'mode' set tracklist properties (consume, random, repeat, single) - 'play-last' restore play state ('tracklist' also required) - 'mixer' set mixer volume and mute state - 'history' restore history :param coverage: amount of data to restore :type coverage: list of strings """ state_file = self._get_state_file() logger.info("Loading state from %s", state_file) data = StoredState.load(state_file) try: # Try only once. If something goes wrong, the next start is clean. state_file.unlink() except OSError: logger.info("Failed to delete %s", state_file) if data is not None: self.history._load_state(data.state.history, coverage) self.tracklist._load_state(data.state.tracklist, coverage) self.mixer._load_state(data.state.mixer, coverage) # playback after tracklist self.playback._load_state(data.state.playback, coverage) logger.debug("Loading state done")
class Backends(list): def __init__(self, backends: Iterable[backend.BackendProxy]) -> None: super().__init__(backends) self.with_library: dict[UriScheme, backend.BackendProxy] = {} self.with_library_browse: dict[UriScheme, backend.BackendProxy] = {} self.with_playback: dict[UriScheme, backend.BackendProxy] = {} self.with_playlists: dict[UriScheme, backend.BackendProxy] = {} backends_by_scheme: dict[UriScheme, backend.BackendProxy] = {} def name(backend_proxy: backend.BackendProxy) -> str: return backend_proxy.actor_ref.actor_class.__name__ for b in backends: try: has_library = b.has_library().get() has_library_browse = b.has_library_browse().get() has_playback = b.has_playback().get() has_playlists = b.has_playlists().get() except Exception: self.remove(b) logger.exception("Fetching backend info for %s failed", name(b)) continue for scheme in b.uri_schemes.get(): if scheme in backends_by_scheme: msg = ( f"Cannot add URI scheme {scheme!r} for {name(b)}, " f"it is already handled by {name(backends_by_scheme[scheme])}" ) raise AssertionError(msg) backends_by_scheme[scheme] = b if has_library: self.with_library[scheme] = b if has_library_browse: self.with_library_browse[scheme] = b if has_playback: self.with_playback[scheme] = b if has_playlists: self.with_playlists[scheme] = b class CoreProxy(ActorMemberMixin, pykka.ActorProxy[Core]): library: LibraryControllerProxy history: HistoryControllerProxy mixer: MixerControllerProxy playback: PlaybackControllerProxy playlists: PlaylistsControllerProxy tracklist: TracklistControllerProxy get_uri_schemes = proxy_method(Core.get_uri_schemes) get_version = proxy_method(Core.get_version) reached_end_of_stream = proxy_method(Core.reached_end_of_stream) stream_changed = proxy_method(Core.stream_changed) position_changed = proxy_method(Core.position_changed) state_changed = proxy_method(Core.state_changed) playlists_loaded = proxy_method(Core.playlists_loaded) volume_changed = proxy_method(Core.volume_changed) mute_changed = proxy_method(Core.mute_changed) tags_changed = proxy_method(Core.tags_changed)