init: added bootstrapping files

This commit is contained in:
François-Xavier Wicht 2026-04-02 17:36:55 +02:00
commit e8322c5ce5
7 changed files with 530 additions and 0 deletions

24
.env.example Normal file
View File

@ -0,0 +1,24 @@
# Your Signal phone number (with country code, e.g. +33123456789)
SIGNAL_PHONE=+33123456789
# signal-cli daemon address (default: localhost:7583)
SIGNAL_HOST=localhost
SIGNAL_PORT=7583
# Qobuz credentials
# App ID and secret are fetched automatically from the Qobuz web app (via qobuz-dl's Bundle)
QOBUZ_EMAIL=your@email.com
QOBUZ_PASSWORD=yourpassword
# Download quality: 5=MP3 320, 6=FLAC 16-bit, 7=FLAC 24-bit/96kHz, 27=FLAC 24-bit/192kHz
QOBUZ_QUALITY=27
# Where to save downloaded music
DOWNLOAD_DIR=~/Music
# For artist downloads: how many recent albums to grab when user picks "top N"
MAX_ARTIST_ALBUMS=5
# How many results to show per category in search
SEARCH_ARTIST_LIMIT=3
SEARCH_ALBUM_LIMIT=5

207
bot.py Normal file
View File

@ -0,0 +1,207 @@
"""
Signal-Qobuz bot main entry point.
Usage:
python bot.py
"""
import asyncio
import logging
import config
import qobuz_client
from qobuz_client import AlbumItem
from session import PendingArtist, ResultEntry, State, UserSession
from signal_client import SignalClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
_sessions: dict[str, UserSession] = {}
def _get_session(number: str) -> UserSession:
if number not in _sessions:
_sessions[number] = UserSession()
return _sessions[number]
def _results_message(entries: list[ResultEntry]) -> str:
lines = ["Here's what I found:\n"]
for i, e in enumerate(entries, 1):
lines.append(f"[{i}] {e.display}")
lines.append("\nReply with a number to download, or send a new search.")
return "\n".join(lines)
def _artist_options_message(name: str, total: int) -> str:
return (
f"{name}{total} album(s) on Qobuz.\n\n"
f"[1] Full discography ({total} albums)\n"
f"[2] Top {config.MAX_ARTIST_ALBUMS} most recent albums\n\n"
"Reply with 1 or 2, or send a new search."
)
async def handle_message(signal: SignalClient, sender: str, text: str) -> None:
session = _get_session(sender)
text = text.strip()
if session.state == State.DOWNLOADING:
await signal.send_message(sender, "A download is already in progress. Please wait.")
return
# ── ARTIST OPTIONS: waiting for "1" or "2" ──────────────────────────────
if session.state == State.ARTIST_OPTIONS and session.pending_artist:
if text == "1":
pending = session.pending_artist
session.state = State.DOWNLOADING
await signal.send_message(
sender, f"Downloading full discography: {pending.name}"
)
await _download_album_list(signal, sender, session, pending.albums)
return
if text == "2":
pending = session.pending_artist
albums = pending.albums[:config.MAX_ARTIST_ALBUMS]
session.state = State.DOWNLOADING
await signal.send_message(
sender,
f"Downloading {len(albums)} most recent album(s) by {pending.name}",
)
await _download_album_list(signal, sender, session, albums)
return
# Anything else → treat as a new search
# ── RESULTS: waiting for a number ────────────────────────────────────────
if session.state == State.RESULTS and text.isdigit():
index = int(text) - 1
if not (0 <= index < len(session.entries)):
await signal.send_message(
sender,
f"Please pick a number between 1 and {len(session.entries)}.",
)
return
entry = session.entries[index]
if entry.kind == "album":
session.state = State.DOWNLOADING
await signal.send_message(sender, f"Downloading: {entry.display}")
try:
await qobuz_client.download_url(entry.item.url)
await signal.send_message(sender, "Download complete.")
except Exception as exc:
logger.error("Album download error: %s", exc)
await signal.send_message(sender, f"Download failed: {exc}")
session.reset()
return
if entry.kind == "artist":
await signal.send_message(
sender, f"Fetching discography for {entry.display}"
)
try:
albums = await qobuz_client.get_artist_albums(entry.item.url)
except Exception as exc:
logger.error("Artist albums error: %s", exc)
await signal.send_message(sender, f"Error fetching albums: {exc}")
session.reset()
return
if not albums:
await signal.send_message(sender, "No albums found for this artist.")
session.reset()
return
# Extract artist name from the display text (format: "Name - (N releases)")
artist_name = entry.display.split(" - (")[0].strip()
session.state = State.ARTIST_OPTIONS
session.pending_artist = PendingArtist(name=artist_name, albums=albums)
await signal.send_message(
sender, _artist_options_message(artist_name, len(albums))
)
return
# ── NEW SEARCH (IDLE or fallthrough) ─────────────────────────────────────
session.reset()
await signal.send_message(sender, f'Searching Qobuz for "{text}"')
try:
results = await qobuz_client.search(text)
except Exception as exc:
logger.error("Search error: %s", exc)
await signal.send_message(sender, f"Search failed: {exc}")
return
entries: list[ResultEntry] = []
for item in results.artists:
entries.append(ResultEntry(display=item.text, kind="artist", item=item))
for item in results.albums:
entries.append(ResultEntry(display=item.text, kind="album", item=item))
if not entries:
await signal.send_message(sender, "No results found. Try a different query.")
return
session.entries = entries
session.state = State.RESULTS
await signal.send_message(sender, _results_message(entries))
async def _download_album_list(
signal: SignalClient,
sender: str,
session: UserSession,
albums: list[AlbumItem],
) -> None:
errors = []
for album in albums:
try:
await qobuz_client.download_album_id(album.id)
await signal.send_message(sender, f"{album.title}")
except Exception as exc:
logger.error("Download error for album %s: %s", album.id, exc)
errors.append(album.title)
await signal.send_message(sender, f"{album.title} (failed)")
ok = len(albums) - len(errors)
if errors:
await signal.send_message(sender, f"Done — {ok}/{len(albums)} succeeded.")
else:
await signal.send_message(sender, "All downloads complete.")
session.reset()
async def main() -> None:
signal = SignalClient()
config.DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
logger.info(
"Bot starting — signal-cli at %s:%s, downloads → %s",
config.SIGNAL_HOST,
config.SIGNAL_PORT,
config.DOWNLOAD_DIR,
)
async def dispatch(sender: str, text: str) -> None:
try:
await handle_message(signal, sender, text)
except Exception as exc:
logger.exception("Unhandled error for %s: %s", sender, exc)
try:
await signal.send_message(sender, "An unexpected error occurred. Please try again.")
except Exception:
pass
await signal.listen(dispatch)
if __name__ == "__main__":
asyncio.run(main())

27
config.py Normal file
View File

@ -0,0 +1,27 @@
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
def _require(key: str) -> str:
value = os.getenv(key)
if not value:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
SIGNAL_PHONE: str = _require("SIGNAL_PHONE")
SIGNAL_HOST: str = os.getenv("SIGNAL_HOST", "localhost")
SIGNAL_PORT: int = int(os.getenv("SIGNAL_PORT", "7583"))
QOBUZ_EMAIL: str = _require("QOBUZ_EMAIL")
QOBUZ_PASSWORD: str = _require("QOBUZ_PASSWORD")
QOBUZ_QUALITY: int = int(os.getenv("QOBUZ_QUALITY", "27")) # 5=MP3, 6=FLAC 16, 7=FLAC 24/96, 27=FLAC 24/192
DOWNLOAD_DIR: Path = Path(os.getenv("DOWNLOAD_DIR", "~/Music")).expanduser()
MAX_ARTIST_ALBUMS: int = int(os.getenv("MAX_ARTIST_ALBUMS", "5"))
SEARCH_ARTIST_LIMIT: int = int(os.getenv("SEARCH_ARTIST_LIMIT", "3"))
SEARCH_ALBUM_LIMIT: int = int(os.getenv("SEARCH_ALBUM_LIMIT", "5"))

130
qobuz_client.py Normal file
View File

@ -0,0 +1,130 @@
"""
Qobuz search and download via the qobuz-dl library.
search_by_type returns: [{"text": "display string", "url": "https://play.qobuz.com/TYPE/ID"}]
handle_url downloads an album or artist URL directly.
For top-N artist albums we fetch artist meta and pick N most recent albums.
"""
import asyncio
import logging
from dataclasses import dataclass
from qobuz_dl.core import QobuzDL
import config
logger = logging.getLogger(__name__)
@dataclass
class SearchItem:
text: str # formatted display string from qobuz-dl
url: str # https://play.qobuz.com/TYPE/ID
@dataclass
class AlbumItem:
id: str
title: str
artist: str
released_at: int # unix timestamp, 0 if unknown
@dataclass
class SearchResults:
artists: list[SearchItem]
albums: list[SearchItem]
def _make_client() -> QobuzDL:
client = QobuzDL(
directory=str(config.DOWNLOAD_DIR),
quality=config.QOBUZ_QUALITY,
smart_discography=True, # skip singles/EPs for artist downloads
)
client.get_tokens() # fetches app_id and secrets from the Qobuz web app
client.initialize_client(
email=config.QOBUZ_EMAIL,
pwd=config.QOBUZ_PASSWORD,
app_id=client.app_id,
secrets=client.secrets,
)
return client
_client: QobuzDL | None = None
_client_lock = asyncio.Lock()
async def get_client() -> QobuzDL:
global _client
async with _client_lock:
if _client is None:
loop = asyncio.get_running_loop()
_client = await loop.run_in_executor(None, _make_client)
return _client
def _search_sync(client: QobuzDL, query: str) -> SearchResults:
raw_artists = client.search_by_type(query, "artist", limit=config.SEARCH_ARTIST_LIMIT) or []
raw_albums = client.search_by_type(query, "album", limit=config.SEARCH_ALBUM_LIMIT) or []
return SearchResults(
artists=[SearchItem(text=r["text"], url=r["url"]) for r in raw_artists],
albums=[SearchItem(text=r["text"], url=r["url"]) for r in raw_albums],
)
async def search(query: str) -> SearchResults:
client = await get_client()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _search_sync, client, query)
def _artist_id_from_url(url: str) -> str:
"""Extract numeric ID from https://play.qobuz.com/artist/ID"""
return url.rstrip("/").split("/")[-1]
def _get_artist_albums_sync(client: QobuzDL, artist_url: str) -> list[AlbumItem]:
artist_id = _artist_id_from_url(artist_url)
albums: list[AlbumItem] = []
for chunk in client.client.get_artist_meta(artist_id):
for item in chunk.get("albums", {}).get("items", []):
albums.append(AlbumItem(
id=str(item.get("id", "")),
title=item.get("title", "Unknown"),
artist=item.get("artist", {}).get("name", "Unknown"),
released_at=int(item.get("released_at", 0) or 0),
))
# Sort newest first
albums.sort(key=lambda a: a.released_at, reverse=True)
return albums
async def get_artist_albums(artist_url: str) -> list[AlbumItem]:
client = await get_client()
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _get_artist_albums_sync, client, artist_url)
def _download_url_sync(client: QobuzDL, url: str) -> None:
client.handle_url(url)
def _download_album_id_sync(client: QobuzDL, album_id: str) -> None:
client.download_from_id(album_id, album=True)
async def download_url(url: str) -> None:
"""Download an album or artist discography by Qobuz URL."""
client = await get_client()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _download_url_sync, client, url)
async def download_album_id(album_id: str) -> None:
"""Download a single album by its Qobuz ID."""
client = await get_client()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _download_album_id_sync, client, album_id)

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
qobuz-dl>=0.9.9
websockets>=13.0
python-dotenv>=1.0.0

42
session.py Normal file
View File

@ -0,0 +1,42 @@
"""
Per-user conversation state machine.
"""
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
from qobuz_client import AlbumItem, SearchItem
class State(Enum):
IDLE = auto() # waiting for a search query
RESULTS = auto() # results shown, waiting for a numbered choice
ARTIST_OPTIONS = auto() # artist chosen, waiting for "1" or "2"
DOWNLOADING = auto() # download in progress
@dataclass
class ResultEntry:
"""One numbered line shown to the user."""
display: str # text shown in the list
kind: str # "artist" or "album"
item: SearchItem # original search item (has .url)
@dataclass
class PendingArtist:
name: str
albums: list[AlbumItem] # full album list, sorted newest-first
@dataclass
class UserSession:
state: State = State.IDLE
entries: list[ResultEntry] = field(default_factory=list)
pending_artist: Optional[PendingArtist] = None
def reset(self) -> None:
self.state = State.IDLE
self.entries = []
self.pending_artist = None

97
signal_client.py Normal file
View File

@ -0,0 +1,97 @@
"""
Async client for signal-cli's JSON-RPC WebSocket daemon.
Start signal-cli daemon with:
signal-cli --config ~/.local/share/signal-cli -a +YOURPHONE daemon --json-rpc
"""
import asyncio
import json
import logging
import uuid
from collections.abc import AsyncIterator, Callable, Coroutine
from typing import Any
import websockets
from websockets.asyncio.client import ClientConnection
import config
logger = logging.getLogger(__name__)
MessageHandler = Callable[[str, str], Coroutine[Any, Any, None]]
# Signature: async def handler(sender_number: str, text: str) -> None
class SignalClient:
def __init__(self) -> None:
self._ws: ClientConnection | None = None
self._uri = f"ws://{config.SIGNAL_HOST}:{config.SIGNAL_PORT}"
async def connect(self) -> None:
self._ws = await websockets.connect(self._uri)
logger.info("Connected to signal-cli at %s", self._uri)
async def close(self) -> None:
if self._ws:
await self._ws.close()
async def send_message(self, recipient: str, text: str) -> None:
if not self._ws:
raise RuntimeError("Not connected")
payload = {
"jsonrpc": "2.0",
"method": "send",
"id": str(uuid.uuid4()),
"params": {
"account": config.SIGNAL_PHONE,
"recipient": [recipient],
"message": text,
},
}
await self._ws.send(json.dumps(payload))
logger.debug("Sent to %s: %s", recipient, text[:80])
async def _messages(self) -> AsyncIterator[dict]:
"""Yield parsed JSON-RPC frames from the WebSocket."""
async for raw in self._ws:
try:
yield json.loads(raw)
except json.JSONDecodeError:
logger.warning("Non-JSON frame: %s", raw[:200])
async def listen(self, handler: MessageHandler) -> None:
"""
Dispatch incoming Signal messages to *handler* indefinitely.
Reconnects automatically on transient errors.
"""
while True:
try:
await self.connect()
async for frame in self._messages():
sender, text = _extract_message(frame)
if sender and text:
asyncio.create_task(handler(sender, text))
except (websockets.ConnectionClosed, OSError) as exc:
logger.warning("Connection lost (%s), reconnecting in 5 s…", exc)
await asyncio.sleep(5)
def _extract_message(frame: dict) -> tuple[str | None, str | None]:
"""Return (sender_number, text) from a receive notification, or (None, None)."""
if frame.get("method") != "receive":
return None, None
envelope = frame.get("params", {}).get("envelope", {})
data = envelope.get("dataMessage", {})
if not data:
return None, None
text: str = data.get("message", "").strip()
sender: str = envelope.get("sourceNumber", "")
# Ignore empty messages, reactions, or messages with no text
if not text or not sender:
return None, None
return sender, text