# Todoist Sync Script The Python daemon that two-way-syncs your Task notes with a Todoist project. Save as `todoist_sync.py` in your vault's `Scripts/` folder (see [[09 Todoist Integration]]). ```python #!/usr/bin/env python3 """ Obsidian <-> Todoist Sync — two-way Obsidian ↔ Todoist sync for Task notes. Synchronizes the following per task: - Title (Obsidian filename after the `YYYY-MM-DD Task - ` prefix ↔ Todoist `content`) - Due date (Obsidian `due:` frontmatter ↔ Todoist `due.date`) - Status open/done (Obsidian `status:` ↔ Todoist completed state) - Body content (everything below frontmatter ↔ Todoist `description`) - Creation and deletion: a new task on either side appears on the other. Architecture: - Inotify-style file watcher (watchdog) handles Obsidian → Todoist near-instantly. - Background poller calls the Todoist Sync API every PULL_INTERVAL seconds to detect Todoist → Obsidian changes (incrementally via sync_token). - A state file (~/.config/obsidian-todoist-sync/state.json) caches the last-synced hash on each side per task, plus the Todoist sync_token and project_id. - Conflict policy: last-write-wins. If both sides changed since the last sync, whichever side's modification timestamp is newer wins. Ties go to Todoist. - A 5-second "just-wrote" cache suppresses watcher events for files we just modified from Todoist, preventing sync loops. Run modes: - Default: starts file watcher + Todoist poller (this is what launchd uses). - --once: a single sync sweep in both directions, then exits. Config: ~/.config/obsidian-todoist-sync/config.json State: ~/.config/obsidian-todoist-sync/state.json Logs: ~/.config/obsidian-todoist-sync/sync.log See Scripts/setup-todoist-sync.md for installation instructions. """ import argparse import hashlib import json import logging import re import sys import threading import time import urllib.parse import uuid from datetime import datetime, timezone from pathlib import Path import frontmatter import requests from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer # === Per-vault constants (substituted at deploy time) === VAULT_ROOT = Path("/Users/YOUR_USERNAME/Library/Mobile Documents/iCloud~md~obsidian/Documents/YOUR_VAULT") CONFIG_NAME = "obsidian-todoist-sync" DEFAULT_PROJECT_NAME = "Obsidian" TASKS_DIR = VAULT_ROOT / "Activities" / "Tasks" CONFIG_DIR = Path.home() / ".config" / CONFIG_NAME CONFIG_FILE = CONFIG_DIR / "config.json" STATE_FILE = CONFIG_DIR / "state.json" LOG_FILE = CONFIG_DIR / "sync.log" # === Tunable constants === # Todoist sunset the legacy REST v2 and Sync v9 APIs on 2026-02-10. Everything # now lives under the unified v1 base: https://api.todoist.com/api/v1/ TODOIST_API = "https://api.todoist.com/api/v1" TODOIST_SYNC = f"{TODOIST_API}/sync" DELETED_LABEL = "deleted-from-obsidian" PULL_INTERVAL_SEC = 30 DEBOUNCE_SEC = 0.6 JUST_WROTE_TTL_SEC = 5.0 FILENAME_MAX_LEN = 200 # Characters illegal in filenames on macOS/iCloud FILENAME_BAD_CHARS = re.compile(r'[\/\\:\*\?"<>\|]') log = logging.getLogger("todoist-sync") def setup_logging(): CONFIG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s", ) # === Config & state === def load_config(): if not CONFIG_FILE.exists(): sys.stderr.write( f"\nConfig not found at {CONFIG_FILE}\n" "Create it with the following content (replace YOUR_TOKEN_HERE):\n\n" " {\n" ' "todoist_token": "YOUR_TOKEN_HERE",\n' f' "project_name": "{DEFAULT_PROJECT_NAME}"\n' " }\n\n" "Then chmod 600 it so only you can read it.\n" ) sys.exit(1) return json.loads(CONFIG_FILE.read_text()) def load_state(): if not STATE_FILE.exists(): return { "project_id": None, "sync_token": "*", "tasks": {}, # str(todoist_id) -> {path, title, due, status, body_hash} } try: s = json.loads(STATE_FILE.read_text()) s.setdefault("sync_token", "*") s.setdefault("tasks", {}) return s except Exception as e: log.error(f"State file corrupt ({e}); starting fresh") return {"project_id": None, "sync_token": "*", "tasks": {}} # Lock the state file across the watcher thread and poller thread. _state_lock = threading.RLock() def save_state(state): with _state_lock: STATE_FILE.write_text(json.dumps(state, indent=2)) # === Just-wrote suppression (prevents sync loops) === _just_wrote: dict[str, float] = {} _just_wrote_lock = threading.Lock() def mark_just_wrote(path: Path): with _just_wrote_lock: _just_wrote[str(path)] = time.time() def was_just_written(path: Path) -> bool: with _just_wrote_lock: ts = _just_wrote.get(str(path)) if ts is None: return False if time.time() - ts < JUST_WROTE_TTL_SEC: return True del _just_wrote[str(path)] return False # === Todoist API === class Todoist: def __init__(self, token): self.token = token self._session = requests.Session() self._session.headers.update({"Authorization": f"Bearer {token}"}) def _request(self, method, path, json_body=None, params=None): headers = {} if json_body is not None: headers["Content-Type"] = "application/json" headers["X-Request-Id"] = uuid.uuid4().hex resp = self._session.request( method, f"{TODOIST_API}{path}", json=json_body, params=params, headers=headers, timeout=30, ) resp.raise_for_status() return resp.json() if resp.text else {} def _paginated(self, path, params=None): """Iterate over all pages of a list endpoint, yielding individual items. Todoist v1 list endpoints return {"results": [...], "next_cursor": "..."} where next_cursor is null/missing on the last page. """ cursor = None params = dict(params or {}) while True: if cursor: params["cursor"] = cursor data = self._request("GET", path, params=params) if isinstance(data, list): # defensive — old shape yield from data return for item in data.get("results", []): yield item cursor = data.get("next_cursor") if not cursor: return def sync(self, sync_token, resource_types=None): resource_types = resource_types or ["items"] data = {"sync_token": sync_token, "resource_types": json.dumps(resource_types)} resp = self._session.post(TODOIST_SYNC, data=data, timeout=60) resp.raise_for_status() return resp.json() def get_or_create_project(self, name, state): if state.get("project_id"): return state["project_id"] for p in self._paginated("/projects"): if p.get("name") == name: state["project_id"] = p["id"] save_state(state) log.info(f"Using existing Todoist project '{name}' (id={p['id']})") return p["id"] new_p = self._request("POST", "/projects", {"name": name}) state["project_id"] = new_p["id"] save_state(state) log.info(f"Created Todoist project '{name}' (id={new_p['id']})") return new_p["id"] def create_task(self, payload): return self._request("POST", "/tasks", payload) def update_task(self, tid, payload): return self._request("POST", f"/tasks/{tid}", payload) def close_task(self, tid): return self._request("POST", f"/tasks/{tid}/close") def reopen_task(self, tid): return self._request("POST", f"/tasks/{tid}/reopen") # === Note utilities === def parse_note(path): """Return (metadata_dict, body_text) or (None, None) on failure.""" try: post = frontmatter.load(str(path)) return post.metadata, post.content except Exception as e: log.error(f"Failed to parse {path}: {e}") return None, None def filename_to_title(path: Path) -> str: """Strip the 'YYYY-MM-DD Task - ' prefix from a Task note's filename.""" m = re.match(r"\d{4}-\d{2}-\d{2}\s+Task\s+-\s+(.+)", path.stem) return m.group(1) if m else path.stem def safe_filename_component(s: str) -> str: s = FILENAME_BAD_CHARS.sub("", s).strip() if len(s) > FILENAME_MAX_LEN: s = s[:FILENAME_MAX_LEN].rstrip() return s or "Untitled" def make_obsidian_url(path: Path) -> str: rel = path.relative_to(VAULT_ROOT) return ( f"obsidian://open?vault={urllib.parse.quote(VAULT_ROOT.name)}" f"&file={urllib.parse.quote(str(rel))}" ) def hash_state(title, due, status, body): blob = { "title": title or "", "due": str(due) if due else "", "status": status or "open", "body": body or "", } return hashlib.sha256( json.dumps(blob, sort_keys=True, ensure_ascii=False).encode("utf-8") ).hexdigest() def read_note_fields(path: Path): """Return (title, due_str, status, body, todoist_id) or None if not a task note.""" md, body = parse_note(path) if md is None or md.get("type") != "task": return None title = filename_to_title(path) due_raw = md.get("due") due_str = str(due_raw)[:10] if due_raw not in (None, "") else None status = md.get("status", "open") raw_tid = md.get("todoist_id") tid = str(raw_tid) if raw_tid not in (None, "", "null") else None return title, due_str, status, body or "", tid def write_frontmatter_field(path: Path, key: str, value): """Inject or replace a frontmatter scalar field, preserving frontmatter ordering.""" text = path.read_text(encoding="utf-8") m = re.match(r"^(---\n)(.*?\n)(---\n)", text, re.DOTALL) if not m: log.warning(f"No frontmatter in {path}; cannot write {key}") return front_open, front_body, front_close = m.groups() value_str = "" if value in (None, "") else str(value) # NOTE: use `.*
not `\s*.*
— \s includes newlines, which on the last # frontmatter line would consume the trailing \n and the substitution would # merge the next line (the closing `---`) onto the same line, breaking # Obsidian's frontmatter detection. pattern = rf"^{re.escape(key)}:.*quot; if re.search(pattern, front_body, re.MULTILINE): front_body = re.sub( pattern, f"{key}: {value_str}", front_body, flags=re.MULTILINE ) else: if not front_body.endswith("\n"): front_body += "\n" front_body += f"{key}: {value_str}\n" new_text = front_open + front_body + front_close + text[m.end() :] path.write_text(new_text, encoding="utf-8") mark_just_wrote(path) def write_full_note(path: Path, frontmatter_fields: dict, body: str): """Write a complete note from scratch (for tasks created on Todoist side).""" lines = ["---"] for k, v in frontmatter_fields.items(): if v is None or v == "": lines.append(f"{k}: ") else: lines.append(f"{k}: {v}") lines.append("---") lines.append("") text = "\n".join(lines) + (body if body else "") + ("\n" if not (body or "").endswith("\n") else "") path.write_text(text, encoding="utf-8") mark_just_wrote(path) def replace_body(path: Path, new_body: str): """Replace everything after the frontmatter close fence with new_body.""" text = path.read_text(encoding="utf-8") m = re.match(r"^(---\n.*?\n---\n)", text, re.DOTALL) if not m: log.warning(f"No frontmatter in {path}; cannot replace body") return new_text = m.group(1) + (new_body if new_body else "") if new_body and not new_body.endswith("\n"): new_text += "\n" path.write_text(new_text, encoding="utf-8") mark_just_wrote(path) def rename_note_for_title(old_path: Path, new_title: str) -> Path: """Rename a Task note to match a new title, preserving the date prefix.""" safe = safe_filename_component(new_title) m = re.match(r"(\d{4}-\d{2}-\d{2}\s+Task\s+-\s+).+", old_path.stem) prefix = m.group(1) if m else f"{datetime.now().strftime('%Y-%m-%d')} Task - " new_name = f"{prefix}{safe}.md" new_path = old_path.with_name(new_name) if new_path == old_path: return old_path if new_path.exists(): log.warning(f"Cannot rename {old_path.name} → {new_name}: target exists") return old_path old_path.rename(new_path) mark_just_wrote(new_path) log.info(f"RENAME {old_path.name} → {new_name}") return new_path # === Push: Obsidian → Todoist === def push_note(path: Path, state, todoist: "Todoist", project_id): """Sync a single Obsidian note to Todoist.""" if not path.exists(): handle_local_deletion(path, state, todoist) return fields = read_note_fields(path) if fields is None: return title, due, status, body, todoist_id = fields new_hash = hash_state(title, due, status, body) payload = {"content": title, "description": body, "project_id": project_id} if due: payload["due_date"] = due else: payload["due_string"] = "no date" with _state_lock: if not todoist_id: try: t = todoist.create_task(payload) todoist_id = str(t["id"]) write_frontmatter_field(path, "todoist_id", todoist_id) if status == "done": todoist.close_task(todoist_id) state["tasks"][todoist_id] = { "path": str(path), "title": title, "due": due, "status": status, "hash": new_hash, } save_state(state) log.info(f"CREATE {todoist_id} {path.name} (Obsidian → Todoist)") except Exception as e: log.error(f"Failed to create Todoist task for {path}: {e}") return cached = state["tasks"].get(todoist_id, {}) cached_hash = cached.get("hash") cached_status = cached.get("status", "open") if ( new_hash == cached_hash and status == cached_status and cached.get("path") == str(path) ): return # no change try: # status transitions if status == "done" and cached_status != "done": todoist.close_task(todoist_id) log.info(f"CLOSE {todoist_id} {path.name} (Obsidian → Todoist)") elif status == "open" and cached_status == "done": todoist.reopen_task(todoist_id) log.info(f"REOPEN {todoist_id} {path.name} (Obsidian → Todoist)") # field updates if new_hash != cached_hash: todoist.update_task(todoist_id, payload) log.info(f"UPDATE {todoist_id} {path.name} (Obsidian → Todoist)") except Exception as e: log.error(f"Failed to push {todoist_id}: {e}") return state["tasks"][todoist_id] = { "path": str(path), "title": title, "due": due, "status": status, "hash": new_hash, } save_state(state) def handle_local_deletion(path: Path, state, todoist: "Todoist"): with _state_lock: path_str = str(path) for tid, info in list(state["tasks"].items()): if info.get("path") == path_str: try: todoist.update_task(tid, {"labels": [DELETED_LABEL]}) todoist.close_task(tid) log.info( f"DELETE {tid} {path.name} (Obsidian → Todoist; labeled & closed)" ) except Exception as e: log.error(f"Failed to mark {tid} as deleted: {e}") del state["tasks"][tid] save_state(state) return # === Pull: Todoist → Obsidian === def pull_from_todoist(state, todoist: "Todoist", project_id): """Single Sync API pull and apply changes locally. Returns count of changes applied.""" try: result = todoist.sync(state.get("sync_token", "*")) except Exception as e: log.error(f"Sync API call failed: {e}") return 0 items = result.get("items", []) new_token = result.get("sync_token", state.get("sync_token", "*")) if not items: with _state_lock: state["sync_token"] = new_token save_state(state) return 0 changes = 0 with _state_lock: for item in items: if str(item.get("project_id")) != str(project_id): # Not in our project. If we have a state entry for it (e.g., user # moved it out of the project in Todoist), drop the local mapping. tid = str(item.get("id")) if tid in state["tasks"]: log.info(f"DROP {tid} (moved out of project)") del state["tasks"][tid] continue try: if apply_remote_item(item, state): changes += 1 except Exception as e: log.exception(f"Failed to apply Todoist item {item.get('id')}: {e}") state["sync_token"] = new_token save_state(state) return changes def apply_remote_item(item: dict, state) -> bool: """Apply a single Todoist item update to Obsidian. Returns True if something changed.""" tid = str(item["id"]) is_deleted = bool(item.get("is_deleted")) is_completed = bool(item.get("checked")) or bool(item.get("is_completed")) title = item.get("content", "") description = item.get("description", "") or "" due_obj = item.get("due") or {} due = due_obj.get("date") if due_obj else None if due: due = due[:10] labels = item.get("labels") or [] cached = state["tasks"].get(tid) # Deletion on Todoist side if is_deleted: if cached: path = Path(cached["path"]) log.info(f"REMOTE-DELETE {tid} → leaving local note {path.name} intact") # Clear the todoist_id so the user can re-sync if desired if path.exists(): write_frontmatter_field(path, "todoist_id", "") del state["tasks"][tid] return True return False # Skip items marked deleted-from-obsidian (we already deleted them locally) if DELETED_LABEL in labels: return False target_status = "done" if is_completed else "open" remote_hash = hash_state(title, due, target_status, description) if cached: path = Path(cached["path"]) cached_hash = cached.get("hash") if not path.exists(): # Local note was deleted but we never saw the deletion event. Drop. log.info(f"REMOTE-PRESENT {tid} but local note missing; dropping mapping") del state["tasks"][tid] return True # Check if Obsidian side has diverged from cache local_fields = read_note_fields(path) if local_fields is None: return False local_title, local_due, local_status, local_body, _ = local_fields local_hash = hash_state(local_title, local_due, local_status, local_body) local_changed = local_hash != cached_hash remote_changed = remote_hash != cached_hash if not remote_changed: return False # No remote change; push thread will handle local if local_changed and remote_changed: # Conflict — LWW. Compare local mtime with poll-cycle start (we just # detected the remote change, so treat remote as "now"). If local # mtime is in the future relative to ~1s ago, prefer local; else remote. local_mtime = path.stat().st_mtime now = time.time() if local_mtime > now - 2.0: # Local change is "just now" — keep local log.info( f"CONFLICT {tid} {path.name} → Obsidian wins (local mtime newer)" ) return False # push thread will resolve else: log.info(f"CONFLICT {tid} {path.name} → Todoist wins") # fall through to apply remote # Apply remote → local apply_remote_to_path(path, item, title, due, target_status, description, state, tid) return True # New on Todoist, not in our state. Create a local note. return create_local_from_remote(item, title, due, target_status, description, state, tid) def apply_remote_to_path( path: Path, item: dict, title: str, due, status: str, body: str, state, tid: str ): # Update title (rename file) cur_title = filename_to_title(path) if cur_title != title: path = rename_note_for_title(path, title) # Update due write_frontmatter_field(path, "due", due or "") # Update status fields write_frontmatter_field(path, "status", status) if status == "done": completed = item.get("completed_at") or datetime.now(timezone.utc).strftime("%Y-%m-%d") write_frontmatter_field(path, "closed", str(completed)[:10]) else: write_frontmatter_field(path, "closed", "") # Replace body replace_body(path, body) state["tasks"][tid] = { "path": str(path), "title": title, "due": due, "status": status, "hash": hash_state(title, due, status, body), } log.info(f"PULL {tid} {path.name} (Todoist → Obsidian)") def create_local_from_remote(item, title, due, status, body, state, tid): today = datetime.now().strftime("%Y-%m-%d") safe = safe_filename_component(title) base_name = f"{today} Task - {safe}.md" target = TASKS_DIR / base_name suffix = 2 while target.exists(): target = TASKS_DIR / f"{today} Task - {safe} ({suffix}).md" suffix += 1 frontmatter_fields = { "type": "task", "date": today, "status": status, "due": due or "", "closed": (item.get("completed_at") or "")[:10] if status == "done" else "", "people": "", "projects": "", "todoist_id": tid, } write_full_note(target, frontmatter_fields, body) state["tasks"][tid] = { "path": str(target), "title": title, "due": due, "status": status, "hash": hash_state(title, due, status, body), } log.info(f"CREATE {tid} {target.name} (Todoist → Obsidian)") return True # === Filesystem watcher === class TaskHandler(FileSystemEventHandler): def __init__(self, state, todoist, project_id): self.state = state self.todoist = todoist self.project_id = project_id self._last_event: dict[str, float] = {} def _debounce(self, key: str) -> bool: now = time.time() last = self._last_event.get(key, 0.0) self._last_event[key] = now return (now - last) < DEBOUNCE_SEC def _push(self, src: str): path = Path(src) if path.suffix != ".md": return if was_just_written(path): return if self._debounce(str(path)): return time.sleep(0.3) # let the file finish writing try: push_note(path, self.state, self.todoist, self.project_id) except Exception as e: log.exception(f"Unhandled error while syncing {path}: {e}") def on_created(self, event): if not event.is_directory: self._push(event.src_path) def on_modified(self, event): if not event.is_directory: self._push(event.src_path) def on_deleted(self, event): if event.is_directory: return path = Path(event.src_path) if path.suffix != ".md": return if was_just_written(path): return try: handle_local_deletion(path, self.state, self.todoist) except Exception as e: log.exception(f"Unhandled error during deletion of {path}: {e}") def on_moved(self, event): if event.is_directory: return old = str(event.src_path) new_path = Path(event.dest_path) if new_path.suffix != ".md": return with _state_lock: for info in self.state["tasks"].values(): if info.get("path") == old: info["path"] = str(new_path) save_state(self.state) break self._push(event.dest_path) # === Top-level orchestration === def initial_sweep(state, todoist, project_id): log.info("Running initial Obsidian → Todoist sweep") count = 0 for p in sorted(TASKS_DIR.glob("*.md")): try: push_note(p, state, todoist, project_id) count += 1 except Exception as e: log.exception(f"Sweep error on {p}: {e}") log.info(f"Initial sweep complete ({count} notes)") def pull_loop(state, todoist, project_id, stop_event: threading.Event): log.info(f"Pull loop started (every {PULL_INTERVAL_SEC}s)") while not stop_event.is_set(): try: changed = pull_from_todoist(state, todoist, project_id) if changed: log.info(f"Applied {changed} Todoist → Obsidian change(s)") except Exception as e: log.exception(f"Pull loop error: {e}") stop_event.wait(PULL_INTERVAL_SEC) def main(): parser = argparse.ArgumentParser() parser.add_argument( "--once", action="store_true", help="Run one bidirectional sweep and exit" ) args = parser.parse_args() setup_logging() log.info("=== todoist_sync (two-way) starting ===") if not VAULT_ROOT.exists(): log.error(f"Vault root does not exist: {VAULT_ROOT}") sys.exit(1) TASKS_DIR.mkdir(parents=True, exist_ok=True) config = load_config() state = load_state() todoist = Todoist(config["todoist_token"]) project_name = config.get("project_name", DEFAULT_PROJECT_NAME) project_id = todoist.get_or_create_project(project_name, state) initial_sweep(state, todoist, project_id) pull_from_todoist(state, todoist, project_id) if args.once: log.info("Exiting after one-shot sweep") return stop_event = threading.Event() poller = threading.Thread( target=pull_loop, args=(state, todoist, project_id, stop_event), daemon=True ) poller.start() log.info(f"Watching {TASKS_DIR}") observer = Observer() observer.schedule( TaskHandler(state, todoist, project_id), str(TASKS_DIR), recursive=False ) observer.start() try: while True: time.sleep(60) except KeyboardInterrupt: log.info("Interrupted, stopping watcher and poller") stop_event.set() observer.stop() observer.join() poller.join(timeout=5) if __name__ == "__main__": main() ```