diff --git a/skills/notebooklm/SKILL.md b/skills/notebooklm/SKILL.md new file mode 100644 index 00000000..df4e5e45 --- /dev/null +++ b/skills/notebooklm/SKILL.md @@ -0,0 +1,177 @@ +--- +name: notebooklm +description: "Turn expert podcasts into personalized protocols with cited experiments. Load 300 episodes from terminal, run an expert-informed interview, build experiments in your Obsidian morning routine. Use when user says \"notebooklm\", \"load channel\", \"expert interview\", \"notebooklm ask\", \"health protocol\", or wants to turn expert content into actionable experiments." +--- + +# NotebookLM - Expert Knowledge to Action + +Turn any expert's content into a personalized protocol with experiments you actually run. Load 300 YouTube episodes into NotebookLM from terminal, run a cited interview about your goal, create experiments in your Obsidian daily note. + +**Video walkthrough:** [https://youtu.be/KRpZSvtMiTI](https://youtu.be/KRpZSvtMiTI) + +## What This Does + +1. **Load sources from terminal.** You can't just tell NotebookLM to add a YouTube channel. This skill does it. One command. 300 episodes. +2. **Cited answers traced to exact transcript lines.** Every recommendation links back to the exact episode and passage. Verifiable. +3. **Expert-informed interviews.** Claude queries NotebookLM with YOUR goal. Generates questions informed by the expert's research on your specific topic. +4. **Experiments in Obsidian.** Protocol becomes experiments in your daily note. Morning routine skill asks every day: how is this going? +5. **Any expert, any domain.** Huberman for health. Lenny for product. Onboarding docs for a new job. Same pattern. + +## Prerequisites + +### 1. Install nlm CLI + +```bash +uv tool install notebooklm-mcp-cli +``` + +Gives you the `nlm` command. See [notebooklm-mcp-cli](https://github.com/jacob-bd/notebooklm-mcp-cli) for details. + +### 2. Install notebooklm-py (for notebook creation and channel loading) + +```bash +pip install "notebooklm-py[browser]" +playwright install chromium +``` + +### 3. Authenticate + +```bash +# nlm CLI auth (for queries and source listing) +nlm auth login + +# notebooklm-py auth (for notebook creation and loading) +notebooklm login +``` + +Both open a browser window for Google login. `nlm` saves to its own config, `notebooklm-py` saves cookies to `~/.notebooklm/storage_state.json`. + +### 4. Obsidian Plugins + +- **Dataview** (required) - for dashboard queries and citation tables + +## Quick Start + +```bash +# List your notebooks +nlm notebook list + +# Ask a question with citations +nlm notebook query "What does Huberman say about deep focus?" --json + +# List sources +nlm source list --json +``` + +## Workflow Routing + +| User says | Workflow | +|-----------|----------| +| "load channel", "youtube channel", "bulk load videos" | [workflows/youtube-channel.md](workflows/youtube-channel.md) | +| "notebooklm ask", "ask notebook", "Q&A" | [workflows/ask.md](workflows/ask.md) | +| "import notebook", "import sources" | [workflows/import.md](workflows/import.md) | +| "notebooklm auth", "notebooklm login" | [workflows/auth.md](workflows/auth.md) | + +## The Full Pipeline + +This is the workflow shown in the video: + +### 1. Pick your expert and goal + +``` +Goal: "I want to improve my health and focus" +Expert: Andrew Huberman (@hubaborhab on YouTube) +``` + +### 2. Load their content + +```bash +# Scrape channel videos +python3 scripts/load_channel.py scrape \ + --channel "https://www.youtube.com/@hubaborhab" \ + --output /tmp/huberman-videos.json + +# Create notebook +notebooklm create "Andrew Huberman - Health" + +# Load 200 most recent health-related episodes +notebooklm use +python3 scripts/load_channel.py load \ + --videos /tmp/huberman-videos.json \ + --notebook \ + --count 200 \ + --concurrency 20 +``` + +### 3. Ask expert-informed questions + +```bash +nlm notebook query \ + "What does Huberman recommend for sustaining deep focus for 4+ hours daily?" --json +``` + +Each answer comes with `[N]` citations back to the exact source and passage. + +### 4. Run a cited interview + +Claude uses the notebook to generate interview questions specific to YOUR goal. You answer honestly. Claude builds a personalized protocol where each recommendation is tied to an exact episode. + +### 5. Create experiments + +The protocol becomes experiments in your Obsidian vault: +- Each experiment has a hypothesis, protocol, success criteria, and timeframe +- They appear in your daily note every morning +- Your morning routine skill asks: "How is this experiment going? Any observations?" + +### 6. Turn it into a reusable skill + +Package the workflow as a `/huberman` or `/lenny` skill. Same pattern, different expert. + +## Vault Structure + +``` +Your Vault/ +├── Notes/NotebookLM/ +│ ├── Huberman Health.md # type: notebook (index) +│ └── huberman-health/ +│ ├── Sources/ # type: notebook-source (transcripts) +│ │ └── Episode Title.md +│ └── QA/ # type: nlm-query (cited answers) +│ └── 2026-04-05 Focus Protocol.md +├── Notes/Experiments/ +│ └── Morning Sunlight Protocol.md # type: experiment +└── Notes/Dashboards/ + └── Health.md # Dashboard with embedded experiments +``` + +## Scripts + +| Script | Purpose | +|--------|---------| +| `scripts/load_channel.py` | Scrape YouTube channel + bulk-load into NotebookLM | +| `scripts/resolve_citations.py` | Replace `[N]` with `[[Source#^anchor\|[N]]]` wikilinks | +| `scripts/import_sources.py` | Import sources as vault files with metadata | +| `scripts/extract_passages.py` | Extract cited passages from Q&A into source files | +| `scripts/backfill_fulltext.py` | Fetch full transcripts for source files | + +All scripts use `Path.cwd()` as vault root. Run them from your vault directory. + +## Citation Resolution + +The resolver turns `[N]` markers in NotebookLM answers into clickable `[[Source#^c-XXXXXXXX|[N]]]` wikilinks. Click to jump to the exact cited passage in the source transcript. + +- Anchor IDs are stable (MD5 of cited text) +- Idempotent: re-running same question skips existing anchors +- Cross-source citation remap: handles collapsed source_ids +- ~96% resolution rate across tested queries + +## Examples + +- **Health:** 300 Huberman episodes -> personalized health protocol with sleep, supplements, exercise experiments +- **Product:** 200 Lenny's Podcast episodes -> product strategy playbook with cited frameworks +- **New job:** Onboarding docs + team wikis + architecture decisions -> ramp-up plan with daily experiments +- **Business:** Hormozi content -> offer audit with value equation scoring + +## License + +MIT diff --git a/skills/notebooklm/scripts/backfill_fulltext.py b/skills/notebooklm/scripts/backfill_fulltext.py new file mode 100644 index 00000000..077a1528 --- /dev/null +++ b/skills/notebooklm/scripts/backfill_fulltext.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +"""Backfill source files with fulltext content from NotebookLM. + +Usage: + cd ~/projects/notebooklm-loader && uv run python3 \ + /path/to/backfill_fulltext.py \ + --notebook \ + --slug \ + --vault /path/to/vault \ + --concurrency 10 +""" +import argparse +import asyncio +import json +import re +import sys +import time +from pathlib import Path + + +def safe_filename(title): + safe = re.sub(r'[/:*?"<>|]', '-', title) + safe = re.sub(r'\s+', ' ', safe).strip() + if len(safe) > 120: + safe = safe[:120].rstrip(' -') + return safe + + +success = 0 +failed = 0 +skipped = 0 + + +async def fetch_and_write(client, sem, notebook_id, source, sources_dir): + global success, failed, skipped + sid = source["id"] + title = source["title"].strip() + filename = safe_filename(title) + ".md" + filepath = sources_dir / filename + + if not filepath.exists(): + skipped += 1 + return + + content = filepath.read_text() + if "## Transcript" in content: + skipped += 1 + return + + async with sem: + try: + ft = await client.sources.get_fulltext(notebook_id, sid) + if not ft.content or len(ft.content) < 100: + failed += 1 + print(f" EMPTY: {filename}", file=sys.stderr) + return + + # Append transcript section + new_content = content.rstrip() + "\n\n## Transcript\n\n" + ft.content + "\n" + filepath.write_text(new_content) + success += 1 + print(f" OK: {filename} ({len(ft.content)} chars)") + except Exception as e: + failed += 1 + print(f" FAIL: {filename} | {str(e)[:80]}", file=sys.stderr) + + +async def main(): + from notebooklm import NotebookLMClient + + parser = argparse.ArgumentParser() + parser.add_argument("--notebook", required=True) + parser.add_argument("--slug", required=True) + parser.add_argument("--vault", default=".") + parser.add_argument("--concurrency", type=int, default=10) + parser.add_argument("--sources-json", help="Path to source list JSON (skips API call)") + args = parser.parse_args() + + vault = Path(args.vault) + sources_dir = vault / "Notes/NotebookLM" / args.slug / "Sources" + + client = await NotebookLMClient.from_storage() + async with client: + if args.sources_json: + with open(args.sources_json) as f: + source_list = json.load(f)["sources"] + else: + raw = await client.sources.list(args.notebook) + source_list = [{"id": s.id, "title": s.title or ""} for s in raw] + + total = len(source_list) + print(f"Backfilling {total} sources (concurrency={args.concurrency})") + + sem = asyncio.Semaphore(args.concurrency) + tasks = [ + fetch_and_write(client, sem, args.notebook, s, sources_dir) + for s in source_list + ] + await asyncio.gather(*tasks) + + print(f"\nDone: {success} written, {skipped} skipped, {failed} failed") + +if __name__ == "__main__": + t0 = time.time() + asyncio.run(main()) + print(f"Elapsed: {time.time() - t0:.0f}s") diff --git a/skills/notebooklm/scripts/extract_passages.py b/skills/notebooklm/scripts/extract_passages.py new file mode 100644 index 00000000..2d51f06e --- /dev/null +++ b/skills/notebooklm/scripts/extract_passages.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +"""Extract cited_text passages from Q&A JSONs and append to source files. + +Usage: + python3 extract_passages.py --qa /tmp/qa-1.json /tmp/qa-2.json \ + --sources /tmp/sources.json --slug my-notebook + +Reads all Q&A JSONs, deduplicates cited_text chunks per source (first 100 chars as key), +and appends a ## Cited Passages section to each source file with ### Passage 1, 2, etc. + +Incremental - appends new passages to sources that already have a ## Cited Passages section. +""" +import argparse +import json +import re +import sys +from pathlib import Path + +VAULT = Path.cwd() + + +def safe_filename(title: str) -> str: + """Same logic as import_sources.py.""" + title = re.sub(r'[/:*?"<>|]', '-', title) + title = re.sub(r'\s+', ' ', title).strip() + if len(title) > 120: + title = title[:120].rstrip(' -') + return title + + +def main(): + parser = argparse.ArgumentParser(description="Extract cited passages into source files") + parser.add_argument("--qa", nargs="+", required=True, help="Q&A JSON files") + parser.add_argument("--sources", required=True, help="Sources JSON file") + parser.add_argument("--slug", required=True, help="Notebook slug") + args = parser.parse_args() + + # Build source_id -> title mapping + with open(args.sources) as f: + sources_data = json.load(f) + + source_titles = {} + for s in sources_data["sources"]: + title = s["title"].strip() + if title == "- YouTube" or len(title) < 3: + continue + source_titles[s["id"]] = title + + # Collect unique chunks per source across all QA files + # Key: source_id -> list of unique cited_text (deduped by first 100 chars) + source_chunks: dict[str, list[str]] = {} + seen_keys: dict[str, set[str]] = {} + + for qa_file in args.qa: + with open(qa_file) as f: + data = json.load(f) + for ref in data["references"]: + ct = ref.get("cited_text", "").strip() + if not ct: + continue + sid = ref["source_id"] + if sid not in source_titles: + continue + dedup_key = ct[:100] + if sid not in seen_keys: + seen_keys[sid] = set() + source_chunks[sid] = [] + if dedup_key not in seen_keys[sid]: + seen_keys[sid].add(dedup_key) + source_chunks[sid].append(ct) + + print(f"Sources with passages: {len(source_chunks)}", file=sys.stderr) + + sources_dir = VAULT / "Notes/NotebookLM" / args.slug / "Sources" + updated = 0 + skipped = 0 + + for sid, chunks in source_chunks.items(): + title = source_titles[sid] + filename = safe_filename(title) + ".md" + filepath = sources_dir / filename + + if not filepath.exists(): + print(f" MISSING: {filename}", file=sys.stderr) + skipped += 1 + continue + + content = filepath.read_text() + if "## Cited Passages" in content: + # Parse existing passages to find what's already there + existing_keys = set() + existing_count = 0 + for m in re.finditer(r'### Passage (\d+)\n\n(.+?)(?=\n### Passage |\Z)', content, re.DOTALL): + existing_count = max(existing_count, int(m.group(1))) + existing_keys.add(m.group(2).strip()[:100]) + + # Find new chunks not already in the file + new_chunks = [c for c in chunks if c[:100] not in existing_keys] + if not new_chunks: + print(f" CURRENT: {filename} ({existing_count} passages, no new)", file=sys.stderr) + skipped += 1 + continue + + # Append new passages after existing ones + new_passages = "" + for i, chunk in enumerate(new_chunks, existing_count + 1): + new_passages += f"\n### Passage {i}\n\n{chunk}\n" + + filepath.write_text(content.rstrip() + "\n" + new_passages) + print(f" APPENDED: {filename} (+{len(new_chunks)} passages, {existing_count + len(new_chunks)} total)", file=sys.stderr) + updated += 1 + else: + # Build new passages section + passages = "\n## Cited Passages\n" + for i, chunk in enumerate(chunks, 1): + passages += f"\n### Passage {i}\n\n{chunk}\n" + + filepath.write_text(content.rstrip() + "\n" + passages) + print(f" CREATED: {filename} ({len(chunks)} passages)", file=sys.stderr) + updated += 1 + + print(f"\nDone: {updated} updated, {skipped} skipped", file=sys.stderr) + + # Output passage mapping as JSON for resolve_citations to use + mapping = {} + for sid, chunks in source_chunks.items(): + mapping[sid] = {} + for i, chunk in enumerate(chunks, 1): + mapping[sid][chunk[:100]] = i + json.dump(mapping, sys.stdout, indent=2) + + +if __name__ == "__main__": + main() diff --git a/skills/notebooklm/scripts/import_sources.py b/skills/notebooklm/scripts/import_sources.py new file mode 100644 index 00000000..6a0602c4 --- /dev/null +++ b/skills/notebooklm/scripts/import_sources.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +"""Import NotebookLM sources into vault as notebook-source files. + +Usage: + python3 import_sources.py --sources /tmp/sources.json --slug my-notebook --dashboard "Dashboard Title" + python3 import_sources.py --sources /tmp/sources.json --slug my-notebook --dashboard "Dashboard Title" --skip-guides + +Creates one .md file per source with frontmatter + AI-generated source guide. +""" +import argparse +import json +import re +import subprocess +import sys +from pathlib import Path + +VAULT = Path.cwd() # Expected to run from vault root + +# Map NotebookLM type strings to our source_type values +TYPE_MAP = { + "SourceType.YOUTUBE": "youtube", + "SourceType.WEB_PAGE": "web", + "SourceType.PDF": "pdf", + "SourceType.TEXT": "text", + "SourceType.GOOGLE_DOCS": "gdocs", + "SourceType.GOOGLE_SLIDES": "gslides", +} + + +def safe_filename(title: str) -> str: + """Make title safe for filesystem.""" + title = re.sub(r'[/:*?"<>|]', '-', title) + title = re.sub(r'\s+', ' ', title).strip() + if len(title) > 120: + title = title[:120].rstrip(' -') + return title + + +def fetch_guide(source_id: str) -> tuple[str, list[str], list[str]]: + """Fetch AI-generated source guide. Returns (summary, topics, keywords).""" + try: + result = subprocess.run( + ["notebooklm", "source", "guide", source_id, "--json"], + capture_output=True, text=True, timeout=60 + ) + if result.returncode != 0: + return "", [], [] + data = json.loads(result.stdout) + return data.get("summary", ""), data.get("topics", []), data.get("keywords", []) + except Exception: + return "", [], [] + + +def main(): + parser = argparse.ArgumentParser(description="Import NotebookLM sources into vault") + parser.add_argument("--sources", required=True, help="Path to notebooklm source list JSON") + parser.add_argument("--slug", required=True, help="Notebook slug (kebab-case folder name)") + parser.add_argument("--dashboard", required=True, help="Dashboard title for related links") + parser.add_argument("--skip-guides", action="store_true", help="Skip fetching AI source guides") + args = parser.parse_args() + + with open(args.sources) as f: + data = json.load(f) + + notebook_id = data.get("notebook_id", "") + sources = data["sources"] + sources_dir = VAULT / "Notes/NotebookLM" / args.slug / "Sources" + sources_dir.mkdir(parents=True, exist_ok=True) + dashboard_path = f"Notes/Dashboards/{args.dashboard}" + + created = 0 + skipped = 0 + + for source in sources: + title = source["title"].strip() + source_id = source["id"] + source_type = TYPE_MAP.get(source["type"], "web") + url = source.get("url") or "" + date = source.get("created_at", "")[:10] + + # Skip sources with garbage titles (multi-URL web pages etc.) + if title == "- YouTube" or len(title) < 3: + print(f" SKIP: '{title}' (bad title)") + skipped += 1 + continue + + filename = safe_filename(title) + ".md" + filepath = sources_dir / filename + + if filepath.exists(): + print(f" EXISTS: {filename}") + skipped += 1 + continue + + # Fetch guide if not skipped + guide_text = "" + keywords = [] + if not args.skip_guides: + print(f" Fetching guide: {title[:60]}...") + summary, topics, keywords = fetch_guide(source_id) + if summary: + guide_text = summary + if topics: + guide_text += "\n\n### Topics\n\n" + ", ".join(topics) + print(f" OK: {len(summary)} chars, {len(keywords)} keywords") + else: + print(f" WARN: no guide returned") + + # Build topics frontmatter + topics_yaml = "" + if keywords: + topics_yaml = "topics:\n" + "\n".join(f' - "[[{k}]]"' for k in keywords) + "\n" + + content = f"""--- +type: notebook-source +source_id: "{source_id}" +notebook_id: "{notebook_id}" +url: "{url}" +source_type: {source_type} +status: active +date: {date} +{topics_yaml}related: + - "[[{dashboard_path}]]" +--- + +# {title} + +## Source Guide + +{guide_text} +""" + + filepath.write_text(content) + print(f" CREATED: {filename}") + created += 1 + + print(f"\nDone: {created} created, {skipped} skipped") + + +if __name__ == "__main__": + main() diff --git a/skills/notebooklm/scripts/load_channel.py b/skills/notebooklm/scripts/load_channel.py new file mode 100644 index 00000000..2f0f6c1f --- /dev/null +++ b/skills/notebooklm/scripts/load_channel.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +"""Scrape YouTube channel videos and bulk-load into NotebookLM. + +Two subcommands: + scrape - Fetch all videos from a YouTube channel via InnerTube API (no deps) + load - Add videos to a NotebookLM notebook in parallel via async API + +Usage: + python3 load_channel.py scrape --channel "https://www.youtube.com/@LennysPodcast" --output /tmp/videos.json + python3 load_channel.py load --videos /tmp/videos.json --notebook --count 200 --concurrency 20 +""" +import argparse +import asyncio +import json +import re +import sys +import time +import urllib.request + + +# ============================================================================= +# Scrape: YouTube channel -> video list JSON +# ============================================================================= + +INNERTUBE_API = "https://www.youtube.com/youtubei/v1/browse" +INNERTUBE_HEADERS = {"Content-Type": "application/json", "User-Agent": "Mozilla/5.0"} +INNERTUBE_CONTEXT = { + "client": {"clientName": "WEB", "clientVersion": "2.20240101.00.00"} +} + + +def resolve_channel_id(channel_url: str) -> str: + """Extract channel ID from a YouTube channel page.""" + req = urllib.request.Request(channel_url, headers={ + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", + "Accept-Language": "en-US,en;q=0.9", + }) + html = urllib.request.urlopen(req).read().decode("utf-8") + + for pattern in [ + r'"channelId":"(UC[^"]+)"', + r'"externalId":"(UC[^"]+)"', + r'channel_id=(UC[^&"]+)', + ]: + m = re.search(pattern, html) + if m: + return m.group(1) + + raise ValueError(f"Could not find channel ID in {channel_url}") + + +def innertube_request(body: dict) -> dict: + """Make an InnerTube API request.""" + data = json.dumps(body).encode() + req = urllib.request.Request(INNERTUBE_API, data=data, headers=INNERTUBE_HEADERS) + return json.loads(urllib.request.urlopen(req).read().decode()) + + +def extract_videos(items: list) -> list[dict]: + """Extract video info from InnerTube response items.""" + videos = [] + for item in items: + vr = None + if "richItemRenderer" in item: + vr = item["richItemRenderer"].get("content", {}).get("videoRenderer") + elif "gridVideoRenderer" in item: + vr = item["gridVideoRenderer"] + elif "videoRenderer" in item: + vr = item["videoRenderer"] + + if not vr: + continue + + vid_id = vr.get("videoId", "") + title = "" + for run in vr.get("title", {}).get("runs", []): + title += run.get("text", "") + if not title: + title = vr.get("title", {}).get("simpleText", "") + + if vid_id and title: + videos.append({ + "id": vid_id, + "title": title, + "length": vr.get("lengthText", {}).get("simpleText", ""), + "views": vr.get("viewCountText", {}).get("simpleText", ""), + "published": vr.get("publishedTimeText", {}).get("simpleText", ""), + "url": f"https://www.youtube.com/watch?v={vid_id}", + }) + return videos + + +def scrape_channel(channel_url: str) -> list[dict]: + """Scrape all videos from a YouTube channel.""" + channel_id = resolve_channel_id(channel_url) + print(f"Channel ID: {channel_id}", file=sys.stderr) + + body = { + "context": INNERTUBE_CONTEXT, + "browseId": channel_id, + "params": "EgZ2aWRlb3PyBgQKAjoA", # Videos tab + } + resp = innertube_request(body) + + all_videos = [] + tabs = resp.get("contents", {}).get("twoColumnBrowseResultsRenderer", {}).get("tabs", []) + + for tab in tabs: + grid = tab.get("tabRenderer", {}).get("content", {}).get("richGridRenderer", {}) + if not grid: + continue + + items = grid.get("contents", []) + all_videos.extend(extract_videos(items)) + + # Follow continuation tokens + token = None + for item in items: + if "continuationItemRenderer" in item: + token = ( + item["continuationItemRenderer"] + ["continuationEndpoint"] + ["continuationCommand"] + ["token"] + ) + + page = 1 + while token and page < 100: + cont_body = {"context": INNERTUBE_CONTEXT, "continuation": token} + cont_resp = innertube_request(cont_body) + + token = None + for action in cont_resp.get("onResponseReceivedActions", []): + append_items = action.get("appendContinuationItemsAction", {}).get("continuationItems", []) + all_videos.extend(extract_videos(append_items)) + + for ci in append_items: + if "continuationItemRenderer" in ci: + token = ( + ci["continuationItemRenderer"] + ["continuationEndpoint"] + ["continuationCommand"] + ["token"] + ) + + page += 1 + print(f"\rPage {page}, videos: {len(all_videos)}", end="", file=sys.stderr) + + print(f"\nTotal: {len(all_videos)} videos", file=sys.stderr) + return all_videos + + +# ============================================================================= +# Load: video list JSON -> NotebookLM notebook +# ============================================================================= + +success_count = 0 +fail_count = 0 +errors = [] + + +async def add_video(client, sem, i, total, video, notebook_id): + """Add a single video to NotebookLM.""" + global success_count, fail_count + url = video["url"] + title = video["title"][:60] + + async with sem: + try: + await client.sources.add_url(notebook_id, url) + success_count += 1 + print(f"[{i}/{total}] OK {title}") + except Exception as e: + fail_count += 1 + err_msg = str(e)[:120] + errors.append({"video": video, "error": err_msg}) + print(f"[{i}/{total}] FAIL {title} | {err_msg}") + + +async def load_videos(videos_file: str, notebook_id: str, count: int, concurrency: int, skip: int): + """Load videos into NotebookLM in parallel.""" + from notebooklm import NotebookLMClient + + videos = json.load(open(videos_file))[skip:skip + count] + total = len(videos) + print(f"Loading {total} videos into {notebook_id} (concurrency={concurrency}, skip={skip})") + + client = await NotebookLMClient.from_storage() + async with client: + sem = asyncio.Semaphore(concurrency) + tasks = [ + add_video(client, sem, i + 1, total, v, notebook_id) + for i, v in enumerate(videos) + ] + await asyncio.gather(*tasks) + + print(f"\nDone: {success_count} OK, {fail_count} failed") + if errors: + with open("/tmp/channel-load-errors.json", "w") as f: + json.dump(errors, f, indent=2) + print(f"Errors saved to /tmp/channel-load-errors.json") + + +# ============================================================================= +# CLI +# ============================================================================= + +def main(): + parser = argparse.ArgumentParser(description="YouTube channel -> NotebookLM loader") + sub = parser.add_subparsers(dest="command", required=True) + + # scrape + sp = sub.add_parser("scrape", help="Scrape videos from a YouTube channel") + sp.add_argument("--channel", required=True, help="YouTube channel URL") + sp.add_argument("--output", required=True, help="Output JSON file path") + + # load + lp = sub.add_parser("load", help="Load videos into a NotebookLM notebook") + lp.add_argument("--videos", required=True, help="Path to scraped videos JSON") + lp.add_argument("--notebook", required=True, help="NotebookLM notebook ID") + lp.add_argument("--count", type=int, default=200, help="Number of videos to load (default: 200)") + lp.add_argument("--concurrency", type=int, default=20, help="Parallel requests (default: 20)") + lp.add_argument("--skip", type=int, default=0, help="Skip N most recent videos (default: 0)") + + args = parser.parse_args() + + if args.command == "scrape": + videos = scrape_channel(args.channel) + with open(args.output, "w") as f: + json.dump(videos, f, indent=2) + print(f"Saved {len(videos)} videos to {args.output}") + + elif args.command == "load": + t0 = time.time() + asyncio.run(load_videos(args.videos, args.notebook, args.count, args.concurrency, args.skip)) + print(f"Elapsed: {time.time() - t0:.0f}s") + + +if __name__ == "__main__": + main() diff --git a/skills/notebooklm/scripts/resolve_citations.py b/skills/notebooklm/scripts/resolve_citations.py new file mode 100644 index 00000000..2f87a1eb --- /dev/null +++ b/skills/notebooklm/scripts/resolve_citations.py @@ -0,0 +1,472 @@ +#!/usr/bin/env python3 +"""Resolve [N] citations in NotebookLM answers to clickable [[wikilinks]]. + +Usage (nlm CLI output): + python3 resolve_citations.py --qa /tmp/qa.json --sources /tmp/sources.json --slug my-notebook + python3 resolve_citations.py --qa /tmp/qa.json --sources /tmp/sources.json \ + --slug my-notebook --title "My Q&A" \ + --output "Notes/NotebookLM/my-notebook/QA/2026-04-03 My Q&A.md" \ + --vault . + +Input formats: + --qa: nlm notebook query output: {value: {answer, references: [{source_id, citation_number, cited_text}]}} + --sources: nlm source list output: [{id, title, type, url}] + +Each [N] becomes [[Source#^anchor|[N]]] - click jumps to the cited line in the transcript. +""" +import argparse +import hashlib +import json +import re +import sys +from pathlib import Path + + +def safe_filename(title): + # Must match import_sources.py exactly + safe = re.sub(r'[:*?"<>|]', '-', title) + safe = re.sub(r'\s+', ' ', safe).strip() + if len(safe) > 120: + safe = safe[:120].rstrip(' -') + return safe + + +def build_source_map(sources_file): + with open(sources_file) as f: + data = json.load(f) + + # nlm format: flat list [{id, title, type, url}] + # old format: {sources: [{id, title, ...}]} + if isinstance(data, list): + sources = data + else: + sources = data.get("sources", []) + + mapping = {} + for s in sources: + title = s["title"].strip() + if title == "- YouTube" or len(title) < 3: + continue + mapping[s["id"]] = safe_filename(title) + return mapping + + +def expand_citation_spec(spec_text): + numbers = [] + for part in spec_text.split(','): + part = part.strip() + if '-' in part: + try: + a, b = part.split('-', 1) + numbers.extend(range(int(a.strip()), int(b.strip()) + 1)) + except ValueError: + continue + else: + try: + numbers.append(int(part)) + except ValueError: + continue + return numbers + + +def _detect_collapsed_citations(answer, references): + """Check if answer citations all share one source_id despite naming multiple sources.""" + answer_cns = set(int(x) for x in re.findall(r'\[(\d+)\]', answer)) + if len(answer_cns) < 2: + return False + cn_to_sid = {r["citation_number"]: r["source_id"] for r in references} + used_sids = set(cn_to_sid[n] for n in answer_cns if n in cn_to_sid) + sections = _extract_section_titles(answer) + return len(used_sids) == 1 and len(sections) >= 2 + + +def _extract_section_titles(answer): + """Extract *"episode title"* markers and their positions from answer text.""" + return [(m.start(), m.group(1).strip('"\u201c\u201d')) + for m in re.finditer(r'\*["\u201c](.+?)["\u201d]\*', answer)] + + +def _fuzzy_match_title(extracted_title, source_map): + """Match an extracted episode title to a source_id via substring overlap.""" + best_id = None + best_score = 0 + for sid, raw_title in source_map.items(): + ext_low = extracted_title.lower() + raw_low = raw_title.lower() + if ext_low in raw_low or raw_low in ext_low: + if len(raw_title) > best_score: + best_score = len(raw_title) + best_id = sid + continue + ext_words = set(re.findall(r'\w+', ext_low)) + raw_words = set(re.findall(r'\w+', raw_low)) + if not ext_words: + continue + overlap = len(ext_words & raw_words) / len(ext_words) + if overlap > 0.5 and overlap * len(ext_words) > best_score: + best_score = overlap * len(ext_words) + best_id = sid + return best_id + + +def _build_citation_section_map(answer, references, source_map): + """When citations are collapsed, remap them using episode title sections.""" + sections = _extract_section_titles(answer) + if len(sections) < 2: + return None + + section_sources = [] + for pos, title in sections: + sid = _fuzzy_match_title(title, source_map) + section_sources.append((pos, sid)) + + cite_positions = {} + for m in re.finditer(r'\[(\d+(?:\s*[-,]\s*\d+)*)\]', answer): + pos = m.start() + for part in m.group(1).split(','): + part = part.strip() + if '-' in part: + try: + a, b = part.split('-', 1) + for n in range(int(a.strip()), int(b.strip()) + 1): + if n not in cite_positions: + cite_positions[n] = pos + except ValueError: + continue + else: + try: + n = int(part) + if n not in cite_positions: + cite_positions[n] = pos + except ValueError: + continue + + remap = {} + for cn, cpos in cite_positions.items(): + best_section_sid = None + for spos, sid in reversed(section_sources): + if spos <= cpos: + best_section_sid = sid + break + if best_section_sid: + remap[cn] = best_section_sid + + return remap if remap else None + + +def _make_anchor_id(cited_text): + """Generate a stable anchor ID from cited_text content.""" + h = hashlib.md5(cited_text[:100].encode()).hexdigest()[:8] + return "c-%s" % h + + +def _strip_anchors(text): + """Remove previously injected anchor blocks (\n\n^c-...\n\n) for clean searching.""" + return re.sub(r'\n\n\^c-[0-9a-f]+\n\n', '', text) + + +def _find_text_position(content, cited_text): + """Find the character position of cited_text in content via substring search. + + Tries full text first, then progressively shorter unique substrings. + If anchors were previously injected in the content, strips them for matching + then maps the position back to the real content. + Returns char position or None. + """ + # Normalize cited_text to match file conventions: + # - nlm returns \xa0\n but files may have just \n + # - Replace non-breaking spaces with regular spaces + # Content stays as-is so returned positions are accurate + norm_content = content.replace('\xa0', ' ') + norm_cited = cited_text.replace('\xa0', ' ').strip() + # Also try with whitespace before newlines collapsed (nlm artifact) + norm_cited_collapsed = re.sub(r' +\n', '\n', norm_cited) + + if not norm_cited: + return None + + # Try both normalizations: raw and with trailing spaces before \n collapsed + # (nlm returns "text \nmore" but files may have "text\nmore") + candidates = [norm_cited] + if norm_cited_collapsed != norm_cited: + candidates.append(norm_cited_collapsed) + + for cited in candidates: + # Try direct search first (works when no anchors interrupt the passage) + pos = norm_content.find(cited) + if pos >= 0: + return pos + + # Try shorter unique substrings + for length in [200, 100, 60]: + if len(cited) < length: + continue + snippet = cited[:length] + pos = norm_content.find(snippet) + if pos >= 0: + second = norm_content.find(snippet, pos + 1) + if second < 0: + return pos + + # Fallback: strip existing anchors from content and retry + # This handles the case where a previous question's anchor was injected + # in the middle of this question's cited_text passage + clean = _strip_anchors(norm_content) + if clean != norm_content: + for cited in candidates: + for length in [200, 100, 60]: + if len(cited) < length: + continue + snippet = cited[:length] + clean_pos = clean.find(snippet) + if clean_pos < 0: + continue + # Map clean_pos back to real content position + # Walk through content, skipping anchor blocks, counting real chars + real_pos = 0 + clean_count = 0 + while clean_count < clean_pos and real_pos < len(norm_content): + # Check if we're at an anchor block + m = re.match(r'\n\n\^c-[0-9a-f]+\n\n', norm_content[real_pos:]) + if m: + real_pos += m.end() + else: + real_pos += 1 + clean_count += 1 + return real_pos + + return None + + +def _inject_inline_anchors(source_path, anchors_to_inject): + """Inject ^anchor-id tags inline in the transcript at cited_text positions. + + anchors_to_inject: list of (anchor_id, file_position) tuples. + + Splits the content at each anchor position and inserts ^anchor-id on its own + paragraph. Obsidian requires blank lines before AND after ^block-id for indexing, + AND the block-id must reference a distinct paragraph (not a 90K char single line). + """ + content = source_path.read_text() + if not content: + return 0 + + # Strip old ## Cited Passages section if present (migration from old approach) + t_idx = content.find("## Transcript") + cp_idx = content.find("## Cited Passages") + if cp_idx >= 0 and (t_idx < 0 or cp_idx > t_idx): + content = content[:cp_idx].rstrip() + "\n" + + # Sort anchors by position descending so insertions don't shift later positions + sorted_anchors = sorted(anchors_to_inject, key=lambda x: x[1], reverse=True) + added = 0 + + for anchor_id, file_pos in sorted_anchors: + tag = "^%s" % anchor_id + # Skip if anchor already exists anywhere in the file + if tag in content: + continue + + # Split content at the anchor position: insert a paragraph break + anchor + # Don't strip whitespace - rstrip/lstrip can eat spaces between words, + # which breaks future cited_text substring searches that span this split point. + before = content[:file_pos] + after = content[file_pos:] + + # Insert: \n\n^anchor-id\n\n between the two halves + content = before + "\n\n" + tag + "\n\n" + after + added += 1 + + if added > 0: + source_path.write_text(content) + + return added + + +def resolve_answer(answer, references, source_map, slug, vault=None): + cn_map = {} + for ref in references: + cn = ref.get("citation_number") + if cn is not None: + cn_map[cn] = ref + + # Fix collapsed citations: all cited refs share one source_id + remap = None + if _detect_collapsed_citations(answer, references): + remap = _build_citation_section_map(answer, references, source_map) + if remap: + print("REMAP: %d citations remapped via section titles" % len(remap), file=sys.stderr) + + sources_path = "Notes/NotebookLM/%s/Sources" % slug + cited_sources = set() + cited_numbers = set() + + # Build anchor map: citation_number -> (source_title, anchor_id, cited_text, original_source_title) + anchor_map = {} + seen_anchors = {} # anchor_id -> first citation_number (dedup) + for n, ref in cn_map.items(): + cited_text = ref.get("cited_text", "").strip() + if not cited_text: + continue + remapped_sid = remap.get(n, ref["source_id"]) if remap else ref["source_id"] + original_sid = ref["source_id"] + anchor_id = _make_anchor_id(cited_text) + + # If two citations have the same anchor_id, they cite the same passage + if anchor_id in seen_anchors: + pass # still create mapping so the link resolves + seen_anchors[anchor_id] = n + + title = source_map.get(remapped_sid) + original_title = source_map.get(original_sid) + if title and cited_text: + anchor_map[n] = (title, anchor_id, cited_text, original_title) + + def replace_citation(match): + spec = match.group(1) + numbers = expand_citation_spec(spec) + links = [] + for n in numbers: + if n not in anchor_map: + links.append("[%d]" % n) + continue + title, anchor_id, _, original_title = anchor_map[n] + cited_sources.add(title) + cited_numbers.add(n) + if title == original_title: + links.append("[[%s/%s#^%s|[%d]]]" % (sources_path, title, anchor_id, n)) + else: + links.append("[[%s/%s|[%d]]]" % (sources_path, title, n)) + return " ".join(links) + + resolved = re.sub( + r'\[(\d+(?:\s*[-,]\s*\d+)*)\]', + replace_citation, + answer + ) + + # Inject inline anchors into source files + if vault: + # Group by ORIGINAL source file (where cited_text actually lives) + by_source = {} + for n in cited_numbers: + title, anchor_id, cited_text, original_title = anchor_map[n] + ref = cn_map[n] + if original_title: + by_source.setdefault(original_title, []).append((anchor_id, cited_text)) + + total_added = 0 + for title, items in by_source.items(): + source_file = vault / sources_path / ("%s.md" % title) + if not source_file.exists(): + print("SKIP: source file not found: %s" % source_file.name, file=sys.stderr) + continue + content = source_file.read_text() + + anchors = [] + seen_positions = set() + for anchor_id, cited_text in items: + file_pos = _find_text_position(content, cited_text) + if file_pos is None: + print("MISS: could not find cited_text for ^%s in %s" % (anchor_id, title), file=sys.stderr) + continue + # Dedup: don't place two anchors at the same position + pos_bucket = file_pos // 200 + if pos_bucket in seen_positions: + continue + seen_positions.add(pos_bucket) + anchors.append((anchor_id, file_pos)) + + if anchors: + count = _inject_inline_anchors(source_file, anchors) + total_added += count + + if total_added > 0: + print("ANCHORS: %d injected across %d source files" % (total_added, len(by_source)), file=sys.stderr) + + stats = { + "total_refs": len(references), + "citations_in_answer": len(cited_numbers), + "cited_sources": len(cited_sources), + "total_sources": len(source_map), + } + + return resolved, sorted(cited_sources), stats + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--qa", required=True) + parser.add_argument("--sources", required=True) + parser.add_argument("--slug", required=True) + parser.add_argument("--title") + parser.add_argument("--notebook", help="Display name of notebook file, e.g. 'Lennys Podcast'") + parser.add_argument("--output") + parser.add_argument("--date") + parser.add_argument("--vault", default=".") + args = parser.parse_args() + + vault = Path(args.vault) + source_map = build_source_map(args.sources) + + with open(args.qa) as f: + qa_raw = json.load(f) + + # nlm format: {value: {answer, references, ...}} + # old format: {answer, references, ...} + if "value" in qa_raw: + qa_data = qa_raw["value"] + else: + qa_data = qa_raw + + resolved, cited_sources, stats = resolve_answer( + qa_data["answer"], qa_data["references"], source_map, args.slug, vault=vault + ) + + print("Citations: %d used, %d sources" % (stats['citations_in_answer'], stats['cited_sources']), file=sys.stderr) + + if not args.output: + print(resolved) + return + + if not args.title: + print("ERROR: --title required for --output", file=sys.stderr) + sys.exit(1) + + from datetime import date + note_date = args.date or date.today().isoformat() + sources_path = "Notes/NotebookLM/%s/Sources" % args.slug + + sources_list = "\n".join( + "- [[%s/%s|%s]]" % (sources_path, t, t) for t in cited_sources + ) + + related_block = "" + if args.notebook: + related_block = '\nrelated:\n - "[[Notes/NotebookLM/%s|%s]]"' % (args.notebook, args.notebook) + + content = """--- +type: nlm-query +status: done +date: %s%s +--- + +# %s + +%s + +--- + +## Sources + +%s +""" % (note_date, related_block, args.title, resolved, sources_list) + + output_path = vault / args.output + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(content) + print("CREATED: %s (%d sources)" % (args.output, stats['cited_sources']), file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/skills/notebooklm/workflows/ask.md b/skills/notebooklm/workflows/ask.md new file mode 100644 index 00000000..d7523ebd --- /dev/null +++ b/skills/notebooklm/workflows/ask.md @@ -0,0 +1,80 @@ +# Ask Notebook Workflow + +Ask a NotebookLM notebook a question and save the answer as a vault reference note with `[N]` citations linked to source files. + +## Inputs + +- **question**: The question to ask +- **title**: Short title for the Q&A note +- **notebook-slug**: Which notebook folder to use +- **notebook-id**: Full UUID of the notebook (from `nlm notebook list`) +- **dashboard-title** (optional): Dashboard to link to via `related` + +## Step 1: Verify Auth + +```bash +nlm auth status +``` + +If auth error, run `nlm auth login` (browser flow). + +## Step 2: Get Sources + +```bash +nlm source list {notebook-id} --json > /tmp/nlm-sources.json +``` + +## Step 3: Ask the Question + +**Delegate to a Haiku sub-agent** to keep JSON out of main context: + +``` +Agent(model="haiku", prompt=""" +1. Run: nlm notebook query {notebook-id} "{question}" --json > /tmp/qa-output.json +2. Return: answer length, citation count, sources_used count +""") +``` + +Or directly: +```bash +nlm notebook query {notebook-id} "{question}" --json > /tmp/qa-output.json +``` + +## Step 4: Resolve Citations + +```bash +cd /path/to/vault && python3 .claude/skills/notebooklm/scripts/resolve_citations.py \ + --qa /tmp/qa-output.json \ + --sources /tmp/nlm-sources.json \ + --slug {notebook-slug} \ + --title "{title}" \ + --notebook "{Notebook Display Name}" \ + --output "Notes/NotebookLM/{notebook-slug}/QA/{date} {title}.md" \ + --vault . +``` + +The `--notebook` flag sets the `related` wikilink to the notebook index file (e.g. `--notebook "Lennys Podcast"` links to `[[Notes/NotebookLM/Lennys Podcast]]`). + +This replaces `[N]` with `[[Source#^anchor|[N]]]` wikilinks. Click opens the source file and jumps to the cited passage. + +## Step 5: Verify + +```bash +obsidian open path="Notes/NotebookLM/{notebook-slug}/QA/{date} {title}.md" newtab +``` + +**CRITICAL: Click 2-3 citation links in Obsidian to verify they jump to the correct passage.** The resolver reporting "N anchors injected" does NOT mean they work. Obsidian block references can silently fail if formatting is wrong. + +## Cross-Source Citation Handling + +NotebookLM's API returns the same `source_id` for all citations in cross-source synthesis answers. The resolver detects this and remaps citations by parsing `*"episode title"*` section markers in the answer text and fuzzy-matching them to source titles. This works automatically. No single-source workaround needed. + +## Anchor Placement + +Anchors are placed via substring search: the resolver finds the `cited_text` passage in the source file and inserts a `^c-XXXXXXXX` block ID on its own line with blank lines before and after (required by Obsidian for block indexing). Anchor IDs are derived from MD5 hash of the cited_text content, so they're stable across runs. + +## Output + +- Q&A note at `Notes/NotebookLM/{notebook-slug}/QA/{date} {title}.md` +- Inline `[N]` rendered as clickable `[[Source#^anchor|[N]]]` links to cited passages in source files +- Sources list at bottom diff --git a/skills/notebooklm/workflows/auth.md b/skills/notebooklm/workflows/auth.md new file mode 100644 index 00000000..492f7938 --- /dev/null +++ b/skills/notebooklm/workflows/auth.md @@ -0,0 +1,41 @@ +# Auth Workflow + +Authenticate with NotebookLM via browser-based Google login using `notebooklm-py`. + +## When Needed + +Any `notebooklm` command returns auth errors or redirects to accounts.google.com. + +## Step 1: Login + +This opens a Chromium browser for interactive Google login. **User must run this themselves** (requires interactive browser). + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm login +``` + +Flow: +1. Browser opens to Google login +2. User completes login + 2FA +3. Waits until NotebookLM homepage loads +4. User presses Enter in terminal +5. Cookies saved to `~/.notebooklm/storage_state.json` + +## Step 2: Verify + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm list +``` + +Should show notebook list without errors. + +## Troubleshooting + +**Browser doesn't open:** Playwright chromium may not be installed. Run: +```bash +cd ~/projects/notebooklm-loader && uv run playwright install chromium +``` + +**Wrong Google account:** Login opens a fresh browser profile at `~/.notebooklm/browser_profile`. If you need a different account, delete this folder first and re-login. + +**Auth expires:** Google cookies typically last weeks. Re-run login when commands start failing. diff --git a/skills/notebooklm/workflows/import.md b/skills/notebooklm/workflows/import.md new file mode 100644 index 00000000..81e45e0a --- /dev/null +++ b/skills/notebooklm/workflows/import.md @@ -0,0 +1,105 @@ +# Import Notebook Workflow + +Import all sources from a NotebookLM notebook into the vault as linked `notebook-source` files with AI-generated guides, plus a dashboard. + +## Inputs + +- **notebook-id**: from `notebooklm list` +- **notebook-slug**: short kebab-case name (e.g. `lennys-podcast`) +- **dashboard-title**: human-readable title (e.g. `Lenny's Podcast Research`) + +If user doesn't specify, derive from `notebooklm status` output. + +## Step 1: Verify Auth + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm status +``` + +If auth error, run [workflows/auth.md](auth.md) first. + +## Step 2: Set Notebook Context + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm use {notebook-id} +``` + +## Step 3: Export Source List + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm source list --json > /tmp/notebooklm-sources.json +``` + +Inspect the JSON. Key fields per source: `id`, `title`, `type`, `url`, `created_at`. + +**FORMAT NOTE:** `nlm source list --json` returns a flat array `[{id, title, type, url}]`. The `import_sources.py` script expects a wrapper `{notebook_id: "...", sources: [...]}` with `type` as `SourceType.YOUTUBE` etc. If using `nlm` output, wrap it first: + +```python +import json +sources = json.load(open("/tmp/notebooklm-sources.json")) +TYPE_MAP = {"youtube": "SourceType.YOUTUBE", "web": "SourceType.WEB_PAGE", "pdf": "SourceType.PDF"} +wrapped = {"notebook_id": "{notebook-id}", "sources": [{ + **s, "type": TYPE_MAP.get(s.get("type",""), f"SourceType.{s.get('type','WEB_PAGE').upper()}"), + "created_at": s.get("created_at", "") +} for s in sources]} +json.dump(wrapped, open("/tmp/notebooklm-sources-wrapped.json", "w")) +``` + +## Step 4: Create Folder Structure + +```bash +mkdir -p "Notes/NotebookLM/{notebook-slug}/Sources" +mkdir -p "Notes/NotebookLM/{notebook-slug}/QA" +``` + +## Step 5: Create Source Files + +```bash +python3 .claude/skills/notebooklm/scripts/import_sources.py \ + --sources /tmp/notebooklm-sources.json \ + --slug {notebook-slug} \ + --dashboard "{dashboard-title}" +``` + +Add `--skip-guides` to skip AI source guide fetching (faster, can add later). + +This creates one `.md` file per source with proper frontmatter and `related` linking to dashboard. + +## Step 6: Create Dashboard + +Create `Notes/Dashboards/{dashboard-title}.md` using the dashboard template from `Templates/Types/dashboard.md`. + +Add these Dataview queries: + +```markdown +## Sources + +\```dataview +TABLE source_type AS "Type", url AS "URL", date AS "Added" +FROM "Notes/NotebookLM/{notebook-slug}/Sources" +WHERE type = "notebook-source" +SORT date DESC +\``` + +## Q&A Log + +\```dataview +TABLE date AS "Date", source AS "Source" +FROM "Notes/NotebookLM/{notebook-slug}/QA" +WHERE type = "reference" +SORT date DESC +\``` +``` + +## Step 7: Verify + +```bash +ls "Notes/NotebookLM/{notebook-slug}/Sources/" | wc -l +obsidian open path="Notes/Dashboards/{dashboard-title}.md" newtab +``` + +## Output + +- Source files in `Notes/NotebookLM/{notebook-slug}/Sources/` +- Dashboard at `Notes/Dashboards/{dashboard-title}.md` +- Empty `QA/` folder ready for ask workflow diff --git a/skills/notebooklm/workflows/youtube-channel.md b/skills/notebooklm/workflows/youtube-channel.md new file mode 100644 index 00000000..b0ab03d8 --- /dev/null +++ b/skills/notebooklm/workflows/youtube-channel.md @@ -0,0 +1,137 @@ +# YouTube Channel Loader Workflow + +Bulk-load all videos from a YouTube channel into a NotebookLM notebook. No external dependencies. Uses InnerTube API for video discovery + notebooklm-py async API for parallel upload. + +## Inputs + +- **channel_url**: YouTube channel URL (e.g. `https://www.youtube.com/@LennysPodcast`) +- **notebook_name**: Name for the NotebookLM notebook +- **count**: Number of most recent videos to load (default: 200, max: 300 per notebook) + +## Step 1: Scrape Channel Videos + +```bash +python3 .claude/skills/notebooklm/scripts/load_channel.py scrape \ + --channel "https://www.youtube.com/@ChannelName" \ + --output /tmp/channel-videos.json +``` + +This: +1. Resolves the channel handle to a channel ID via page HTML +2. Uses InnerTube browse API to paginate through all videos +3. Follows continuation tokens until all videos are fetched +4. Saves JSON array: `[{id, title, length, views, published, url}, ...]` +5. Videos are ordered most recent first + +No API key needed. No external dependencies. Pure stdlib. + +## Step 2: Create Notebook + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm create "{notebook_name}" +``` + +Note the notebook ID from the output. + +## Step 3: Load Videos + +```bash +cd ~/projects/notebooklm-loader && uv run python3 \ + scripts/load_channel.py load \ + --videos /tmp/channel-videos.json \ + --notebook {notebook-id} \ + --count {count} \ + --concurrency 20 +``` + +This: +1. Reads the video list JSON +2. Opens async NotebookLM client (uses `~/.notebooklm/storage_state.json`) +3. Fires `add_url` for each video with 20 concurrent requests +4. Reports progress and saves errors to `/tmp/channel-load-errors.json` + +**Performance:** ~200 videos in ~75 seconds with concurrency=20. + +## Step 4: Fetch Transcript Content + +**CRITICAL:** Source files created by import are empty stubs. Citations will ALL fail unless you fetch the actual transcript content first. + +```bash +# Fetch content for all sources (threaded, ~2 min for 200 sources) +python3 /tmp/fetch_transcripts.py +``` + +Or inline: +```python +import json, subprocess +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +SOURCES_DIR = Path("Notes/NotebookLM/{notebook-slug}/Sources") +sources = json.load(open("/tmp/nlm-sources.json")) + +# Build source_id -> file mapping +id_to_file = {} +for f in SOURCES_DIR.glob("*.md"): + for line in f.read_text().split("\n"): + if line.startswith("source_id:"): + sid = line.split('"')[1] + id_to_file[sid] = f + break + +def fetch(sid): + r = subprocess.run(["nlm", "source", "content", sid, "--json"], + capture_output=True, text=True, timeout=120) + if r.returncode != 0: return sid, None + return sid, json.loads(r.stdout).get("value", {}).get("content", "") + +with ThreadPoolExecutor(max_workers=10) as ex: + for future in as_completed(ex.submit(fetch, s["id"]) for s in sources if s["id"] in id_to_file): + sid, content = future.result() + if content: + f = id_to_file[sid] + f.write_text(f.read_text() + f"\n## Transcript\n\n{content}\n") +``` + +`nlm source content --json` returns `{value: {content: "..."}}`. Some sources may return empty if still processing on NotebookLM's end (retry after a few minutes). + +## Step 5: Verify + +```bash +cd ~/projects/notebooklm-loader && uv run notebooklm use {notebook-id} +cd ~/projects/notebooklm-loader && uv run notebooklm source list --json | python3 -c "import json,sys; d=json.load(sys.stdin); print(f'{len(d)} sources loaded')" +``` + +## Step 6 (Optional): Import to Vault + +Follow [workflows/import.md](import.md) to create vault source files and a dashboard. + +## Limits + +- **300 sources per notebook.** For channels with 300+ videos, create multiple notebooks (e.g. `channel-part-1`, `channel-part-2`). +- **Some videos may fail** if they're private, deleted, or region-locked. Check `/tmp/channel-load-errors.json`. +- **Processing time:** After upload, NotebookLM indexes each video server-side. This takes a few minutes. Sources may show as "processing" initially. + +## Example: Full Pipeline + +```bash +# 1. Scrape +python3 .claude/skills/notebooklm/scripts/load_channel.py scrape \ + --channel "https://www.youtube.com/@LennysPodcast" \ + --output /tmp/lennys-videos.json + +# 2. Create notebook +cd ~/projects/notebooklm-loader && uv run notebooklm create "Lenny's Podcast" +# -> 2a18a53d-be60-4951-af17-8b7303dc097e + +# 3. Load 200 most recent +cd ~/projects/notebooklm-loader && uv run python3 \ + /path/to/load_channel.py load \ + --videos /tmp/lennys-videos.json \ + --notebook 2a18a53d-be60-4951-af17-8b7303dc097e \ + --count 200 + +# 4. Ask questions +cd ~/projects/notebooklm-loader && uv run notebooklm use 2a18a53d-... +cd ~/projects/notebooklm-loader && uv run notebooklm ask "What are the top product management frameworks discussed?" +```