at://jaystevens.me/sh.tangled.string/3m2unbhxuce22

Back to Collection

Record JSON

{
  "$type": "sh.tangled.string",
  "contents": "#!/usr/bin/env python3\n\n# This requires some basic knowledge of Python to use. Sorry, this was a quick-and-dirty script, not anything meant for general consumption\n# Usage:\n# * Create a venv with the PIP packages \"atproto\", \"unidecode\", and \"musicbrainzngs\" (Google \"python venv\" if you don't know how to do this)\n# * Put your Spotify exports in the same folder as this Python file\n# * Double-check that FILES_TO_CONVERT matches your Spotify export filenames\n# * Put in your MusicBrainz credentials into MBRNZ_USER/MBRNZ_PASS (yeah I'm not using environment variables, sorry)\n# * Create an app password for your ATProto/Bluesky account and put it into BSKY_HANDLE and BSKY_PASSWORD, also point BSKY_PDS at your PDS (probably https://bsky.social)\n# * Run the script (`python ./converter.py`)\n# * Watch the teal.fm firehose to verify things are going across: https://discord.com/channels/1299158421655912498/1418272538299207891\n# * (Alternatively, just look at your PDS directly with https://pdsls.dev/)\n#\n# This process will take a very long time due to Bluesky rate limits.\n# Bluesky can process 11666 records/day or 1666 records/hour, whichever is lower. Being locked out will lock you out of Bluesky itself too\n# Expect the process to take multiple days. I wrote the script to be slightly below the rate limits so just be careful how much you like posts etc.\n#\n# Some limitations:\n# * We try to fetch the MusicBrainz ID for every song, but it doesn't always work. When it fails, we leave that data unpopulated so it'll backfill\n# * There are sometimes more subtle failures where the fuzzy matching gets a _slightly_ wrong version (wrong album etc.). This is because Spotify\n#   song/album names do not always match what MusicBrainz has\n# * You're only \"supposed\" to scrobble songs which are more than halfway completed - we don't have that info during the scrobble process (probably\n#   could add it, but ehhh) so you sometimes get double-scrobbles when you stopped a track and then resumed listening to it later. (We do handle\n#   skipping songs, so you don't need to worry about accidentally tracking a skip)\n# * People will get to stare at your library slowly upload to the Teal.FM firehose, and that can be embarassing\n\n# Boring license stuff:\n# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE\n#                    Version 2, December 2004\n#\n# Copyright (C) 2004 Sam Hocevar \u003csam@hocevar.net\u003e\n#\n# Everyone is permitted to copy and distribute verbatim or modified\n# copies of this license document, and changing it is allowed as long\n# as the name is changed.\n#\n#            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE\n#   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION\n#\n#  0. You just DO WHAT THE FUCK YOU WANT TO.\n#\n# (Also I do not take liability if this blows your stuff up, if it fails catastrophically somehow that's on you)\n\nimport json\nimport musicbrainzngs\nimport re\nimport string\nimport time\nimport unicodedata\n\nfrom atproto import Client\nfrom datetime import datetime, timedelta, timezone\nfrom collections import deque\nfrom unidecode import unidecode\nfrom zoneinfo import ZoneInfo\n\n# Grab your Spotify exports (the full Spotify data, not the truncated one - there's 2 options, make sure you're grabbing the right one)\n# Once you have 'em, put them in the same folder as you're putting this Python script\nFILES_TO_CONVERT = [\n    'Streaming_History_Audio_2012-2019_0.json',\n    'Streaming_History_Audio_2019-2022_1.json',\n    'Streaming_History_Audio_2022-2025_2.json'\n]\n\n# MusicBrainz username and password - yes, I'm hardcoding secrets, just be careful with 'em\nMBRNZ_USER = YOUR MUSICBRAINZ USERNAME GOES HERE\nMBRNZ_PASS = YOUR MUSICBRAINZ PASSWORD GOES HERE\nMBRNZ_CONTACT = YOUR CONTACT INFO SO MUSICBRAINZ CAN YELL AT YOU IF YOU USE ALL THEIR BANDWIDTH\n\n# PDS handle and app password (I am not implementing OAuth for this)\nBSKY_PDS_URL = YOUR PDS HERE (PROBABLY https://bsky.social)\nBSKY_HANDLE = YOUR BLUESKY HANDLE WITHOUT THE @\nBSKY_PASSWORD = YOUR BLUESKY APP PASSWORD\n\n# If you are on a Mushroom PDS (if you don't know what that means, you are on a Mushroom PDS) then Bluesky puts special rate limits on you for hourly/daily usage\n# Exceeding these limits will lock you out of EVERYTHING on the network, not just Teal but also Bluesky, Tangled, everything ATProto\nBSKY_RATE_LIMIT_HOURLY = 1666\nBSKY_RATE_LIMIT_DAILY = 11666\n\nPROVIDER = \"tealfm\"\nMB_CACHE = {}\n\npds = Client(BSKY_PDS_URL)\n\npunctuation_table = str.maketrans('', '', string.punctuation)\n\nlast_cache_miss = 0.0\ntotal_songs = 0\n\nmusicbrainzngs.auth(MBRNZ_USER, MBRNZ_PASS)\nmusicbrainzngs.set_useragent(\"Jay's Teal.fm Spotify Importer\", \"0.1\", MBRNZ_CONTACT)\n\ndef main():\n    print(\"Starting convert\\n\")\n    start_time = time.perf_counter()\n\n    all_songs = []\n\n    # Handle the session being interrupted midway through\n    cutoff_ts = datetime.max\n    \n    start_str = None\n    with open(PROVIDER + \"_last_ts.txt\", \"r\", encoding=\"utf-8\") as f:\n        start_str = f.readline().strip()\n\n    if start_str != None:\n        start_ts = datetime.strptime(start_str, \"%Y-%m-%dT%H:%M:%SZ\")\n    else:\n        start_ts = datetime.min\n\n    # Parse each file\n    for file in FILES_TO_CONVERT:\n        print(\"Parsing \" + file)\n\n        with open(file, 'r', encoding='utf-8') as open_file:\n            data = json.load(open_file)\n\n        for entry in data:\n            skipped = entry.get('skipped', False)\n            incognito = entry.get('incognito_mode', False)\n            played_seconds = int(entry['ms_played']) / 1000.0\n\n            artist = entry['master_metadata_album_artist_name']\n            track = entry['master_metadata_track_name']\n            album = entry['master_metadata_album_album_name']\n\n            # Skip podcasts/audiobooks\n            if artist is None or track is None:\n                continue\n            if entry.get(\"episode_name\") or entry.get(\"audiobook_title\"):\n                continue\n\n            global total_songs\n            total_songs += 1\n\n            # Parse timestamp\n            try:\n                ts_utc = datetime.strptime(entry['ts'], \"%Y-%m-%dT%H:%M:%SZ\")\n                ts = ts_utc.replace(tzinfo=ZoneInfo(\"UTC\")).astimezone(ZoneInfo(\"America/Los_Angeles\"))\n            except Exception:\n                ts_utc = datetime.now(timezone.utc)\n                ts = datetime.now()\n\n            if skipped or played_seconds \u003c 30:\n                continue\n\n            # Skip anything after our end cutoff\n            if ts_utc \u003e cutoff_ts:\n                continue\n\n            # Skip anything we have already imported\n            if ts_utc \u003c= start_ts:\n                continue\n\n            # Skip anything we didn't want to log\n            if incognito:\n                continue\n\n            formatted = entry['ts']\n            track_uri = entry['spotify_track_uri']\n            song_data = [artist, track, album, formatted, artist, str(int(played_seconds)), track_uri]\n            all_songs.append(song_data)\n\n    print(\"Finished parsing songs\")\n\n    scrobble_to_pds(all_songs)    \n\n    print(\"\\nConvert finished in \" + str(time.perf_counter() - start_time) + \" seconds.\")\n\ndef normalize_key(s):\n    if s is None:\n        return ''\n\n    # Decompose Unicode characters (NFKD), remove accents\n    s = unicodedata.normalize('NFKD', s)\n    s = unidecode(s)  # Romanize non-Latin scripts\n    s = s.lower().strip()\n\n    # Common replacements before cleanup\n    replacements = {\n        r'\\boriginal motion picture soundtrack\\b': 'ost',\n        r'\\boriginal soundtrack\\b': 'ost',\n        r'\\bsoundtrack\\b': 'ost',\n        r'\\bvol(\\.|ume)?\\b': 'vol',\n        r'\\bpart\\b': 'pt',\n        r'\\bparts\\b': 'pt',\n        r'\\bedition\\b': '',\n        r'\\bthe\\b': '',\n        r'\\band\\b': '',\n        r'\\bep\\b': '',\n        r'\\bwalt disney records\\b': '',\n        r'\\blegacy collection\\b': '',\n        r'\\bgreatest hits\\b': '',\n        r'\\breissue(d)?\\b': '',\n        r'\\bre-issue(d)?\\b': '',\n        r'\\bsong of the\\b': '',\n        r'\\bost\\b': '',\n        r'\\bdeluxe\\b': '',\n    }\n    for pattern, repl in replacements.items():\n        s = re.sub(pattern, repl, s, flags=re.IGNORECASE)\n\n    # Remove tags like “Remastered”, “Deluxe”, “Expanded”, etc.\n    cleanup_patterns = [\n        r'\\bfrom\\b.*$',  # remove everything after 'from'\n        r'\\(.*\\)',\n        r'\\[.*(remaster(ed)?|deluxe|expanded|ep|single|greatest hits|anniversary|special edition|bonus tracks?|credits track|version|mix|mono|stereo|reissue(d)?).*?\\]',\n        r'[-–:]\\s*(remaster(ed)?(\\s*\\d{4})?|ep|single|deluxe|expanded|anniversary|greatest hits|special edition|ghost note symphonies|country version|bonus tracks?|credits track|version|mix|mono|stereo|reissue(d)?).*$',\n    ]\n    for pattern in cleanup_patterns:\n        s = re.sub(pattern, '', s, flags=re.IGNORECASE)\n\n    # Remove trailing artist/cover info\n    s = re.sub(r'\\s*-\\s*(?:cover|live|remaster|remix|version|edit|single|mono|stereo|mix|karaoke|instrumental|feat\\.?|featuring)\\b.*$', '', s, flags=re.IGNORECASE)\n    s = re.sub(r'\\(.*cover.*?\\)', '', s, flags=re.IGNORECASE)  # removes \"(Pink Floyd cover)\"\n    s = re.sub(r'\\[.*cover.*?\\]', '', s, flags=re.IGNORECASE)  # removes \"[Pink Floyd cover]\"\n    s = re.sub(r'\\s*(feat\\.?|featuring)\\s+.*$', '', s, flags=re.IGNORECASE)\n\n    # Standardize quotes/apostrophes\n    s = s.replace(\"’\", \"'\").replace(\"‘\", \"'\").replace(\"“\", '\"').replace(\"”\", '\"')\n    s = ''.join(c for c in s if not unicodedata.combining(c))\n\n    # Remove punctuation\n    s = s.translate(punctuation_table)\n\n    # Convert Roman numerals I–XX to numbers\n    roman_map = {\n        'xx': '20', 'xix': '19', 'xviii': '18', 'xvii': '17', 'xvi': '16', 'xv': '15',\n        'xiv': '14', 'xiii': '13', 'xii': '12', 'xi': '11', 'x': '10', 'ix': '9',\n        'viii': '8', 'vii': '7', 'vi': '6', 'v': '5', 'iv': '4', 'iii': '3',\n        'ii': '2', 'i': '1'\n    }\n    for roman, arabic in roman_map.items():\n        s = re.sub(rf'\\b{roman}\\b', arabic, s, flags=re.IGNORECASE)\n\n    # Convert written numbers one–twenty to digits\n    word_nums = {\n        'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5',\n        'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10',\n        'eleven': '11', 'twelve': '12', 'thirteen': '13', 'fourteen': '14',\n        'fifteen': '15', 'sixteen': '16', 'seventeen': '17', 'eighteen': '18',\n        'nineteen': '19', 'twenty': '20'\n    }\n    for word, num in word_nums.items():\n        s = re.sub(rf'\\b{word}\\b', num, s, flags=re.IGNORECASE)\n\n    # Strip punctuation again and normalize spaces\n    s = re.sub(r'[^\\w\\s]', '', s)\n    s = re.sub(r'\\s+', '', s)\n\n    return s\n\ndef scrobble_to_pds(all_songs):\n    # Login using your Bluesky/PDS credentials\n    pds.login(BSKY_HANDLE, BSKY_PASSWORD)\n\n    remaining = len(all_songs)\n    \n    request_times = deque()\n    failed_lookups = {}\n\n    last_pds_access_time = 0.0\n\n    # Favor the daily limit if there's any potential for it being close\n    if len(all_songs) \u003e= BSKY_RATE_LIMIT_DAILY * 0.75:\n        time_between_posts = (60 * 60 * 24) / (BSKY_RATE_LIMIT_DAILY * 0.9)\n    else:\n        # Can skirt a little closer to the hourly limit\n        time_between_posts = (60 * 60) / (BSKY_RATE_LIMIT_HOURLY * 0.95)\n\n    print(\"\")\n    print(\"\")\n    print(\"\")\n    print(\"Rate limit: \" + str(time_between_posts))\n\n    # Wait a bit in case the job was re-kicked\n    time.sleep(time_between_posts)\n\n    print(f\"Starting PDS scrobble at {time.time()}\")\n    scrobble_begin_time = time.perf_counter()\n\n    for entry in all_songs:\n        start_time = time.perf_counter()\n        artist = entry[0]\n        title = entry[1]\n        album = entry[2]\n        timestamp = entry[3]\n        url = entry[6]\n\n        mbrn_track = lookup_track(artist, album, title)\n\n        if mbrn_track.get(\"releaseMbId\") is None:\n            track_norm = normalize_key(title)\n            album_norm = normalize_key(album)\n            key = f\"{normalize_key(artist)}::{album_norm}::{track_norm}\"\n            print(f\"⚠️ No release found for {key}\")\n\n        retry_time = 2\n\n        time_remaining = remaining * time_between_posts\n        eta_utc = datetime.now(timezone.utc) + timedelta(seconds=time_remaining)\n        eta_local = eta_utc.astimezone()\n        if time_remaining \u003c 60:\n            time_remaining_str = str(time_remaining) + \" seconds\"\n        else:\n            time_remaining /= 60\n            if time_remaining \u003c 60:\n                time_remaining_str = str(time_remaining) + \" minutes\"\n            else:\n                time_remaining /= 60\n                if time_remaining \u003c 48:\n                    time_remaining_str = str(time_remaining) + \" hours\"\n                else:\n                    time_remaining /= 24\n                    time_remaining_str = str(time_remaining) + \" days\"\n\n        print(f\"{title} - {artist} ({album}) - listened to at {timestamp}\")\n        print(\"Remaining records to be processed: \" + str(remaining) + \"; \" + str(round((1.0 - (remaining / total_songs)) * 100)) + \"% complete\")\n        print(\"⏳ ETA: \" + time_remaining_str + \" (finishing on \" + eta_local.strftime(\"%A, %Y-%m-%d at %I:%M:%S %p %Z\") + \")\")\n\n        # --- 2. Create the record data ---\n        record = {\n            \"$type\": \"fm.teal.alpha.feed.play\",\n            \"playedTime\": timestamp,\n            \"artists\": mbrn_track.get(\"artists\", [{\"artistName\": artist}]),\n            \"trackName\": mbrn_track.get(\"trackName\", title),\n            \"recordingMbId\": mbrn_track.get(\"recordingMbId\"),\n            \"releaseName\": mbrn_track.get(\"releaseName\", album),\n            \"releaseMbId\": mbrn_track.get(\"releaseMbId\"),\n            \"duration\": mbrn_track.get(\"duration\"),\n            \"submissionClientAgent\": \"manual/unknown\",\n            \"musicServiceBaseDomain\": \"spotify.com\",\n            \"originUrl\": url\n        }\n        if mbrn_track.get(\"releaseMbId\") is None:\n            track_norm = normalize_key(title)\n            album_norm = normalize_key(album)\n            key = f\"{normalize_key(artist)}::{album_norm}::{track_norm}\"\n            failed_lookups[key] = record\n\n        success = False\n        while not success:\n            try:\n                delta_time = time.perf_counter() - last_pds_access_time                \n                sleep_time = time_between_posts - delta_time\n                if sleep_time \u003e 0:\n                    print(\"Waiting \" + str(sleep_time) + \" before trying to contact PDS to stay under Mushroom rate limits\")\n                    time.sleep(sleep_time)\n                print(\"\")\n                print(\"Pushing to PDS\")\n\n                now = time.time()\n                request_times.append(now)\n                last_pds_access_time = time.perf_counter()\n\n                # --- 3. Publish the record to your repo ---\n                response = pds.com.atproto.repo.create_record(\n                    data={\n                        \"repo\": pds.me.did,\n                        \"collection\": \"fm.teal.alpha.feed.play\",\n                        \"record\": record,\n                    }\n                )\n                success = True\n            except Exception as e:\n                    print(f\"⚠️ Error posting {title}: {e}, waiting {retry_time} seconds\")\n                    time.sleep(retry_time)\n                    retry_time *= 2\n                    if retry_time \u003e 14400:\n                        retry_time = 2\n\n        print(\"✅ Record \" + str(record) + \" created successfully!\")\n        print(\"AT URI:\" + response.uri + \"; CID: \" + response.cid)\n        print(\"\")\n\n        # Drop timestamps older than one hour\n        while request_times and request_times[0] \u003c now - 3600:\n            request_times.popleft()\n\n        remaining -= 1\n\n        with open(PROVIDER + \"_last_ts.txt\", \"w\", encoding=\"utf-8\") as f:\n            f.write(f\"{entry[3]}\")\n\n        time_spent = time.perf_counter() - start_time\n        total_time_spent = time.perf_counter() - scrobble_begin_time\n\n        # --- 4. Print results ---\n        print(\"Took \" + str(time_spent))\n        print(\"Requests so far this hour: \" + str(len(request_times)) + \" (working for \" + str(total_time_spent / 60.0) + \" minutes)\")\n        print(\"\")\n\n    print(\"\")\n    print(\"\")\n\ndef get_from_cache(key):\n    \"\"\"Return cached result if valid, else None.\"\"\"\n    return MB_CACHE.get(key)\n\ndef set_cache(key, recording, release):\n    \"\"\"Store result in cache with timestamp.\"\"\"\n\n    if recording != None:\n        metadata = {\n                \"trackName\": recording[\"title\"],\n                \"recordingMbId\": recording[\"id\"],\n                \"duration\": int(int(recording.get(\"length\", 0)) / 1000) if \"length\" in recording else None,\n            }\n\n        if \"artist-credit\" in recording:\n            metadata[\"artists\"] = []\n\n            for credit in recording[\"artist-credit\"]:\n                if \"artist\" in credit:\n                    a = credit[\"artist\"]\n                    metadata[\"artists\"].append({\"artistName\": a[\"name\"], \"artistMbId\": a[\"id\"]})\n\n        metadata[\"releaseName\"] = release.get(\"title\", None)\n        metadata[\"releaseMbId\"] = release.get(\"id\", None)\n    else:\n        print(f\"⚠️ Couldn't find recording: \" + key)\n        metadata = {}\n\n    MB_CACHE[key] = {\"data\": metadata, \"timestamp\": time.time()}\n    print(\"Cached \" + key + \": \" + str(metadata))\n    print(\"New cache size \" + str(len(MB_CACHE)))\n    print(\"\")\n    print(\"\")\n\ndef match_artist(artist_input, artist_credits):\n    \"\"\"Check if any artist in the credits matches the input.\"\"\"\n    artist_input_norm = normalize_key(artist_input)\n    for credit in artist_credits:\n        if \"artist\" in credit:\n            mb_artist_record = credit[\"artist\"]\n            artist_key = normalize_key(mb_artist_record[\"name\"])\n            if artist_key == artist_input_norm:\n                return True\n\n            for alias in mb_artist_record.get('alias-list', []):\n                if normalize_key(alias['alias']) == artist_input_norm:\n                    return True\n\n            print(\"Not same artist: \" + artist_key + \" != \" + artist_input_norm)\n    return False\n\ndef parse_date(date_str):\n    \"\"\"Parse MusicBrainz release date safely.\"\"\"\n    if not date_str:\n        return None\n    try:\n        # handle partial dates like \"2001-05\" or \"2001\"\n        parts = date_str.split(\"-\")\n        if len(parts) == 1:\n            return datetime(int(parts[0]), 1, 1)\n        elif len(parts) == 2:\n            return datetime(int(parts[0]), int(parts[1]), 1)\n        else:\n            return datetime(int(parts[0]), int(parts[1]), int(parts[2]))\n    except Exception:\n        return None\n\ndef lookup_track(artist_name, album_name, track_name):\n    \"\"\"\n    Look up a track on MusicBrainz by artist and title.\n    Returns the best match or None.\n    \"\"\"\n    global last_cache_miss\n\n    track_norm = normalize_key(track_name)\n    album_norm = normalize_key(album_name)\n    key = f\"{normalize_key(artist_name)}::{album_norm}::{track_norm}\"\n    cached = get_from_cache(key)\n    if cached:\n        return cached[\"data\"]\n    \n    miss_time = time.perf_counter()\n    delta_time = miss_time - last_cache_miss\n    \n    print(\"Cache miss for \" + key + \", first miss in \" + str(delta_time) + \" seconds\")\n\n    sleep_time = 1.0 - delta_time\n    if sleep_time \u003e 0:\n        print(\"Waiting \" + str(sleep_time) + \" before trying to fetch track metadata\")\n        time.sleep(sleep_time)\n\n    try:\n        # Grab several results in case we need to match the album\n        result = musicbrainzngs.search_recordings(\n            artist=artist_name,\n            recording=track_name,\n            limit=10,\n        )\n        last_cache_miss = time.perf_counter()\n        recordings = result.get(\"recording-list\", [])\n\n        if not recordings:\n            set_cache(key, None, None)\n            return get_from_cache(key)[\"data\"]\n\n        # Filter by album if provided\n        candidates = []\n        for rec in recordings:\n            rec_artists = rec.get(\"artist-credit\", [])\n            if not match_artist(artist_name, rec_artists):\n                continue\n\n            rec_track_norm = normalize_key(rec.get(\"title\"))\n            if rec_track_norm != track_norm:\n                print(\"Not same track: \" + rec_track_norm + \" != \" + track_norm)\n                continue\n\n            for release in rec.get(\"release-list\", []):\n                rec_album_norm = normalize_key(release.get(\"title\"))\n                \n                status = release.get(\"status\", \"\")\n                if status != \"Official\":\n                    print(\"Non-official: \" + rec_album_norm + \" (\" + status +\")\")\n                    continue\n\n                release_date = parse_date(release.get(\"date\"))\n                candidates.append((release_date, rec, release))\n                \n                if rec_album_norm != album_norm:\n                    print(\"Not same album: \" + rec_album_norm + \" != \" + album_norm)\n                    continue\n\n\n                print(\"Matched: \" + album_norm)\n\n                # Perfect match\n                set_cache(key, rec, release)\n                return get_from_cache(key)[\"data\"]\n        \n        # Couldn't find anything\n        set_cache(key, None, None)\n        return get_from_cache(key)[\"data\"]\n    except musicbrainzngs.WebServiceError as e:\n        print(f\"MusicBrainz lookup error: {e}\")\n        return {}\n\nmain()\n",
  "createdAt": "2025-10-10T21:43:28Z",
  "description": "Converts a Spotify full account export (not the limited export, the full one) to PDS records compatible with Teal.fm's \"fm.teal.alpha.feed.play\" lexicon",
  "filename": "converter.py"
}