# Todoist Sync Script
The Python daemon that two-way-syncs your Task notes with a Todoist project. Save as `todoist_sync.py` in your vault's `Scripts/` folder (see [[09 Todoist Integration]]).
```python
#!/usr/bin/env python3
"""
Obsidian <-> Todoist Sync — two-way Obsidian ↔ Todoist sync for Task notes.
Synchronizes the following per task:
- Title (Obsidian filename after the `YYYY-MM-DD Task - ` prefix ↔ Todoist `content`)
- Due date (Obsidian `due:` frontmatter ↔ Todoist `due.date`)
- Status open/done (Obsidian `status:` ↔ Todoist completed state)
- Body content (everything below frontmatter ↔ Todoist `description`)
- Creation and deletion: a new task on either side appears on the other.
Architecture:
- Inotify-style file watcher (watchdog) handles Obsidian → Todoist near-instantly.
- Background poller calls the Todoist Sync API every PULL_INTERVAL seconds to
detect Todoist → Obsidian changes (incrementally via sync_token).
- A state file (~/.config/obsidian-todoist-sync/state.json) caches the last-synced
hash on each side per task, plus the Todoist sync_token and project_id.
- Conflict policy: last-write-wins. If both sides changed since the last sync,
whichever side's modification timestamp is newer wins. Ties go to Todoist.
- A 5-second "just-wrote" cache suppresses watcher events for files we just
modified from Todoist, preventing sync loops.
Run modes:
- Default: starts file watcher + Todoist poller (this is what launchd uses).
- --once: a single sync sweep in both directions, then exits.
Config: ~/.config/obsidian-todoist-sync/config.json
State: ~/.config/obsidian-todoist-sync/state.json
Logs: ~/.config/obsidian-todoist-sync/sync.log
See Scripts/setup-todoist-sync.md for installation instructions.
"""
import argparse
import hashlib
import json
import logging
import re
import sys
import threading
import time
import urllib.parse
import uuid
from datetime import datetime, timezone
from pathlib import Path
import frontmatter
import requests
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
# === Per-vault constants (substituted at deploy time) ===
VAULT_ROOT = Path("/Users/YOUR_USERNAME/Library/Mobile Documents/iCloud~md~obsidian/Documents/YOUR_VAULT")
CONFIG_NAME = "obsidian-todoist-sync"
DEFAULT_PROJECT_NAME = "Obsidian"
TASKS_DIR = VAULT_ROOT / "Activities" / "Tasks"
CONFIG_DIR = Path.home() / ".config" / CONFIG_NAME
CONFIG_FILE = CONFIG_DIR / "config.json"
STATE_FILE = CONFIG_DIR / "state.json"
LOG_FILE = CONFIG_DIR / "sync.log"
# === Tunable constants ===
# Todoist sunset the legacy REST v2 and Sync v9 APIs on 2026-02-10. Everything
# now lives under the unified v1 base: https://api.todoist.com/api/v1/
TODOIST_API = "https://api.todoist.com/api/v1"
TODOIST_SYNC = f"{TODOIST_API}/sync"
DELETED_LABEL = "deleted-from-obsidian"
PULL_INTERVAL_SEC = 30
DEBOUNCE_SEC = 0.6
JUST_WROTE_TTL_SEC = 5.0
FILENAME_MAX_LEN = 200
# Characters illegal in filenames on macOS/iCloud
FILENAME_BAD_CHARS = re.compile(r'[\/\\:\*\?"<>\|]')
log = logging.getLogger("todoist-sync")
def setup_logging():
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
)
# === Config & state ===
def load_config():
if not CONFIG_FILE.exists():
sys.stderr.write(
f"\nConfig not found at {CONFIG_FILE}\n"
"Create it with the following content (replace YOUR_TOKEN_HERE):\n\n"
" {\n"
' "todoist_token": "YOUR_TOKEN_HERE",\n'
f' "project_name": "{DEFAULT_PROJECT_NAME}"\n'
" }\n\n"
"Then chmod 600 it so only you can read it.\n"
)
sys.exit(1)
return json.loads(CONFIG_FILE.read_text())
def load_state():
if not STATE_FILE.exists():
return {
"project_id": None,
"sync_token": "*",
"tasks": {}, # str(todoist_id) -> {path, title, due, status, body_hash}
}
try:
s = json.loads(STATE_FILE.read_text())
s.setdefault("sync_token", "*")
s.setdefault("tasks", {})
return s
except Exception as e:
log.error(f"State file corrupt ({e}); starting fresh")
return {"project_id": None, "sync_token": "*", "tasks": {}}
# Lock the state file across the watcher thread and poller thread.
_state_lock = threading.RLock()
def save_state(state):
with _state_lock:
STATE_FILE.write_text(json.dumps(state, indent=2))
# === Just-wrote suppression (prevents sync loops) ===
_just_wrote: dict[str, float] = {}
_just_wrote_lock = threading.Lock()
def mark_just_wrote(path: Path):
with _just_wrote_lock:
_just_wrote[str(path)] = time.time()
def was_just_written(path: Path) -> bool:
with _just_wrote_lock:
ts = _just_wrote.get(str(path))
if ts is None:
return False
if time.time() - ts < JUST_WROTE_TTL_SEC:
return True
del _just_wrote[str(path)]
return False
# === Todoist API ===
class Todoist:
def __init__(self, token):
self.token = token
self._session = requests.Session()
self._session.headers.update({"Authorization": f"Bearer {token}"})
def _request(self, method, path, json_body=None, params=None):
headers = {}
if json_body is not None:
headers["Content-Type"] = "application/json"
headers["X-Request-Id"] = uuid.uuid4().hex
resp = self._session.request(
method,
f"{TODOIST_API}{path}",
json=json_body,
params=params,
headers=headers,
timeout=30,
)
resp.raise_for_status()
return resp.json() if resp.text else {}
def _paginated(self, path, params=None):
"""Iterate over all pages of a list endpoint, yielding individual items.
Todoist v1 list endpoints return {"results": [...], "next_cursor": "..."}
where next_cursor is null/missing on the last page.
"""
cursor = None
params = dict(params or {})
while True:
if cursor:
params["cursor"] = cursor
data = self._request("GET", path, params=params)
if isinstance(data, list): # defensive — old shape
yield from data
return
for item in data.get("results", []):
yield item
cursor = data.get("next_cursor")
if not cursor:
return
def sync(self, sync_token, resource_types=None):
resource_types = resource_types or ["items"]
data = {"sync_token": sync_token, "resource_types": json.dumps(resource_types)}
resp = self._session.post(TODOIST_SYNC, data=data, timeout=60)
resp.raise_for_status()
return resp.json()
def get_or_create_project(self, name, state):
if state.get("project_id"):
return state["project_id"]
for p in self._paginated("/projects"):
if p.get("name") == name:
state["project_id"] = p["id"]
save_state(state)
log.info(f"Using existing Todoist project '{name}' (id={p['id']})")
return p["id"]
new_p = self._request("POST", "/projects", {"name": name})
state["project_id"] = new_p["id"]
save_state(state)
log.info(f"Created Todoist project '{name}' (id={new_p['id']})")
return new_p["id"]
def create_task(self, payload):
return self._request("POST", "/tasks", payload)
def update_task(self, tid, payload):
return self._request("POST", f"/tasks/{tid}", payload)
def close_task(self, tid):
return self._request("POST", f"/tasks/{tid}/close")
def reopen_task(self, tid):
return self._request("POST", f"/tasks/{tid}/reopen")
# === Note utilities ===
def parse_note(path):
"""Return (metadata_dict, body_text) or (None, None) on failure."""
try:
post = frontmatter.load(str(path))
return post.metadata, post.content
except Exception as e:
log.error(f"Failed to parse {path}: {e}")
return None, None
def filename_to_title(path: Path) -> str:
"""Strip the 'YYYY-MM-DD Task - ' prefix from a Task note's filename."""
m = re.match(r"\d{4}-\d{2}-\d{2}\s+Task\s+-\s+(.+)", path.stem)
return m.group(1) if m else path.stem
def safe_filename_component(s: str) -> str:
s = FILENAME_BAD_CHARS.sub("", s).strip()
if len(s) > FILENAME_MAX_LEN:
s = s[:FILENAME_MAX_LEN].rstrip()
return s or "Untitled"
def make_obsidian_url(path: Path) -> str:
rel = path.relative_to(VAULT_ROOT)
return (
f"obsidian://open?vault={urllib.parse.quote(VAULT_ROOT.name)}"
f"&file={urllib.parse.quote(str(rel))}"
)
def hash_state(title, due, status, body):
blob = {
"title": title or "",
"due": str(due) if due else "",
"status": status or "open",
"body": body or "",
}
return hashlib.sha256(
json.dumps(blob, sort_keys=True, ensure_ascii=False).encode("utf-8")
).hexdigest()
def read_note_fields(path: Path):
"""Return (title, due_str, status, body, todoist_id) or None if not a task note."""
md, body = parse_note(path)
if md is None or md.get("type") != "task":
return None
title = filename_to_title(path)
due_raw = md.get("due")
due_str = str(due_raw)[:10] if due_raw not in (None, "") else None
status = md.get("status", "open")
raw_tid = md.get("todoist_id")
tid = str(raw_tid) if raw_tid not in (None, "", "null") else None
return title, due_str, status, body or "", tid
def write_frontmatter_field(path: Path, key: str, value):
"""Inject or replace a frontmatter scalar field, preserving frontmatter ordering."""
text = path.read_text(encoding="utf-8")
m = re.match(r"^(---\n)(.*?\n)(---\n)", text, re.DOTALL)
if not m:
log.warning(f"No frontmatter in {path}; cannot write {key}")
return
front_open, front_body, front_close = m.groups()
value_str = "" if value in (None, "") else str(value)
# NOTE: use `.*
not `\s*.*
— \s includes newlines, which on the last
# frontmatter line would consume the trailing \n and the substitution would
# merge the next line (the closing `---`) onto the same line, breaking
# Obsidian's frontmatter detection.
pattern = rf"^{re.escape(key)}:.*quot;
if re.search(pattern, front_body, re.MULTILINE):
front_body = re.sub(
pattern, f"{key}: {value_str}", front_body, flags=re.MULTILINE
)
else:
if not front_body.endswith("\n"):
front_body += "\n"
front_body += f"{key}: {value_str}\n"
new_text = front_open + front_body + front_close + text[m.end() :]
path.write_text(new_text, encoding="utf-8")
mark_just_wrote(path)
def write_full_note(path: Path, frontmatter_fields: dict, body: str):
"""Write a complete note from scratch (for tasks created on Todoist side)."""
lines = ["---"]
for k, v in frontmatter_fields.items():
if v is None or v == "":
lines.append(f"{k}: ")
else:
lines.append(f"{k}: {v}")
lines.append("---")
lines.append("")
text = "\n".join(lines) + (body if body else "") + ("\n" if not (body or "").endswith("\n") else "")
path.write_text(text, encoding="utf-8")
mark_just_wrote(path)
def replace_body(path: Path, new_body: str):
"""Replace everything after the frontmatter close fence with new_body."""
text = path.read_text(encoding="utf-8")
m = re.match(r"^(---\n.*?\n---\n)", text, re.DOTALL)
if not m:
log.warning(f"No frontmatter in {path}; cannot replace body")
return
new_text = m.group(1) + (new_body if new_body else "")
if new_body and not new_body.endswith("\n"):
new_text += "\n"
path.write_text(new_text, encoding="utf-8")
mark_just_wrote(path)
def rename_note_for_title(old_path: Path, new_title: str) -> Path:
"""Rename a Task note to match a new title, preserving the date prefix."""
safe = safe_filename_component(new_title)
m = re.match(r"(\d{4}-\d{2}-\d{2}\s+Task\s+-\s+).+", old_path.stem)
prefix = m.group(1) if m else f"{datetime.now().strftime('%Y-%m-%d')} Task - "
new_name = f"{prefix}{safe}.md"
new_path = old_path.with_name(new_name)
if new_path == old_path:
return old_path
if new_path.exists():
log.warning(f"Cannot rename {old_path.name} → {new_name}: target exists")
return old_path
old_path.rename(new_path)
mark_just_wrote(new_path)
log.info(f"RENAME {old_path.name} → {new_name}")
return new_path
# === Push: Obsidian → Todoist ===
def push_note(path: Path, state, todoist: "Todoist", project_id):
"""Sync a single Obsidian note to Todoist."""
if not path.exists():
handle_local_deletion(path, state, todoist)
return
fields = read_note_fields(path)
if fields is None:
return
title, due, status, body, todoist_id = fields
new_hash = hash_state(title, due, status, body)
payload = {"content": title, "description": body, "project_id": project_id}
if due:
payload["due_date"] = due
else:
payload["due_string"] = "no date"
with _state_lock:
if not todoist_id:
try:
t = todoist.create_task(payload)
todoist_id = str(t["id"])
write_frontmatter_field(path, "todoist_id", todoist_id)
if status == "done":
todoist.close_task(todoist_id)
state["tasks"][todoist_id] = {
"path": str(path),
"title": title,
"due": due,
"status": status,
"hash": new_hash,
}
save_state(state)
log.info(f"CREATE {todoist_id} {path.name} (Obsidian → Todoist)")
except Exception as e:
log.error(f"Failed to create Todoist task for {path}: {e}")
return
cached = state["tasks"].get(todoist_id, {})
cached_hash = cached.get("hash")
cached_status = cached.get("status", "open")
if (
new_hash == cached_hash
and status == cached_status
and cached.get("path") == str(path)
):
return # no change
try:
# status transitions
if status == "done" and cached_status != "done":
todoist.close_task(todoist_id)
log.info(f"CLOSE {todoist_id} {path.name} (Obsidian → Todoist)")
elif status == "open" and cached_status == "done":
todoist.reopen_task(todoist_id)
log.info(f"REOPEN {todoist_id} {path.name} (Obsidian → Todoist)")
# field updates
if new_hash != cached_hash:
todoist.update_task(todoist_id, payload)
log.info(f"UPDATE {todoist_id} {path.name} (Obsidian → Todoist)")
except Exception as e:
log.error(f"Failed to push {todoist_id}: {e}")
return
state["tasks"][todoist_id] = {
"path": str(path),
"title": title,
"due": due,
"status": status,
"hash": new_hash,
}
save_state(state)
def handle_local_deletion(path: Path, state, todoist: "Todoist"):
with _state_lock:
path_str = str(path)
for tid, info in list(state["tasks"].items()):
if info.get("path") == path_str:
try:
todoist.update_task(tid, {"labels": [DELETED_LABEL]})
todoist.close_task(tid)
log.info(
f"DELETE {tid} {path.name} (Obsidian → Todoist; labeled & closed)"
)
except Exception as e:
log.error(f"Failed to mark {tid} as deleted: {e}")
del state["tasks"][tid]
save_state(state)
return
# === Pull: Todoist → Obsidian ===
def pull_from_todoist(state, todoist: "Todoist", project_id):
"""Single Sync API pull and apply changes locally. Returns count of changes applied."""
try:
result = todoist.sync(state.get("sync_token", "*"))
except Exception as e:
log.error(f"Sync API call failed: {e}")
return 0
items = result.get("items", [])
new_token = result.get("sync_token", state.get("sync_token", "*"))
if not items:
with _state_lock:
state["sync_token"] = new_token
save_state(state)
return 0
changes = 0
with _state_lock:
for item in items:
if str(item.get("project_id")) != str(project_id):
# Not in our project. If we have a state entry for it (e.g., user
# moved it out of the project in Todoist), drop the local mapping.
tid = str(item.get("id"))
if tid in state["tasks"]:
log.info(f"DROP {tid} (moved out of project)")
del state["tasks"][tid]
continue
try:
if apply_remote_item(item, state):
changes += 1
except Exception as e:
log.exception(f"Failed to apply Todoist item {item.get('id')}: {e}")
state["sync_token"] = new_token
save_state(state)
return changes
def apply_remote_item(item: dict, state) -> bool:
"""Apply a single Todoist item update to Obsidian. Returns True if something changed."""
tid = str(item["id"])
is_deleted = bool(item.get("is_deleted"))
is_completed = bool(item.get("checked")) or bool(item.get("is_completed"))
title = item.get("content", "")
description = item.get("description", "") or ""
due_obj = item.get("due") or {}
due = due_obj.get("date") if due_obj else None
if due:
due = due[:10]
labels = item.get("labels") or []
cached = state["tasks"].get(tid)
# Deletion on Todoist side
if is_deleted:
if cached:
path = Path(cached["path"])
log.info(f"REMOTE-DELETE {tid} → leaving local note {path.name} intact")
# Clear the todoist_id so the user can re-sync if desired
if path.exists():
write_frontmatter_field(path, "todoist_id", "")
del state["tasks"][tid]
return True
return False
# Skip items marked deleted-from-obsidian (we already deleted them locally)
if DELETED_LABEL in labels:
return False
target_status = "done" if is_completed else "open"
remote_hash = hash_state(title, due, target_status, description)
if cached:
path = Path(cached["path"])
cached_hash = cached.get("hash")
if not path.exists():
# Local note was deleted but we never saw the deletion event. Drop.
log.info(f"REMOTE-PRESENT {tid} but local note missing; dropping mapping")
del state["tasks"][tid]
return True
# Check if Obsidian side has diverged from cache
local_fields = read_note_fields(path)
if local_fields is None:
return False
local_title, local_due, local_status, local_body, _ = local_fields
local_hash = hash_state(local_title, local_due, local_status, local_body)
local_changed = local_hash != cached_hash
remote_changed = remote_hash != cached_hash
if not remote_changed:
return False # No remote change; push thread will handle local
if local_changed and remote_changed:
# Conflict — LWW. Compare local mtime with poll-cycle start (we just
# detected the remote change, so treat remote as "now"). If local
# mtime is in the future relative to ~1s ago, prefer local; else remote.
local_mtime = path.stat().st_mtime
now = time.time()
if local_mtime > now - 2.0:
# Local change is "just now" — keep local
log.info(
f"CONFLICT {tid} {path.name} → Obsidian wins (local mtime newer)"
)
return False # push thread will resolve
else:
log.info(f"CONFLICT {tid} {path.name} → Todoist wins")
# fall through to apply remote
# Apply remote → local
apply_remote_to_path(path, item, title, due, target_status, description, state, tid)
return True
# New on Todoist, not in our state. Create a local note.
return create_local_from_remote(item, title, due, target_status, description, state, tid)
def apply_remote_to_path(
path: Path, item: dict, title: str, due, status: str, body: str, state, tid: str
):
# Update title (rename file)
cur_title = filename_to_title(path)
if cur_title != title:
path = rename_note_for_title(path, title)
# Update due
write_frontmatter_field(path, "due", due or "")
# Update status fields
write_frontmatter_field(path, "status", status)
if status == "done":
completed = item.get("completed_at") or datetime.now(timezone.utc).strftime("%Y-%m-%d")
write_frontmatter_field(path, "closed", str(completed)[:10])
else:
write_frontmatter_field(path, "closed", "")
# Replace body
replace_body(path, body)
state["tasks"][tid] = {
"path": str(path),
"title": title,
"due": due,
"status": status,
"hash": hash_state(title, due, status, body),
}
log.info(f"PULL {tid} {path.name} (Todoist → Obsidian)")
def create_local_from_remote(item, title, due, status, body, state, tid):
today = datetime.now().strftime("%Y-%m-%d")
safe = safe_filename_component(title)
base_name = f"{today} Task - {safe}.md"
target = TASKS_DIR / base_name
suffix = 2
while target.exists():
target = TASKS_DIR / f"{today} Task - {safe} ({suffix}).md"
suffix += 1
frontmatter_fields = {
"type": "task",
"date": today,
"status": status,
"due": due or "",
"closed": (item.get("completed_at") or "")[:10] if status == "done" else "",
"people": "",
"projects": "",
"todoist_id": tid,
}
write_full_note(target, frontmatter_fields, body)
state["tasks"][tid] = {
"path": str(target),
"title": title,
"due": due,
"status": status,
"hash": hash_state(title, due, status, body),
}
log.info(f"CREATE {tid} {target.name} (Todoist → Obsidian)")
return True
# === Filesystem watcher ===
class TaskHandler(FileSystemEventHandler):
def __init__(self, state, todoist, project_id):
self.state = state
self.todoist = todoist
self.project_id = project_id
self._last_event: dict[str, float] = {}
def _debounce(self, key: str) -> bool:
now = time.time()
last = self._last_event.get(key, 0.0)
self._last_event[key] = now
return (now - last) < DEBOUNCE_SEC
def _push(self, src: str):
path = Path(src)
if path.suffix != ".md":
return
if was_just_written(path):
return
if self._debounce(str(path)):
return
time.sleep(0.3) # let the file finish writing
try:
push_note(path, self.state, self.todoist, self.project_id)
except Exception as e:
log.exception(f"Unhandled error while syncing {path}: {e}")
def on_created(self, event):
if not event.is_directory:
self._push(event.src_path)
def on_modified(self, event):
if not event.is_directory:
self._push(event.src_path)
def on_deleted(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix != ".md":
return
if was_just_written(path):
return
try:
handle_local_deletion(path, self.state, self.todoist)
except Exception as e:
log.exception(f"Unhandled error during deletion of {path}: {e}")
def on_moved(self, event):
if event.is_directory:
return
old = str(event.src_path)
new_path = Path(event.dest_path)
if new_path.suffix != ".md":
return
with _state_lock:
for info in self.state["tasks"].values():
if info.get("path") == old:
info["path"] = str(new_path)
save_state(self.state)
break
self._push(event.dest_path)
# === Top-level orchestration ===
def initial_sweep(state, todoist, project_id):
log.info("Running initial Obsidian → Todoist sweep")
count = 0
for p in sorted(TASKS_DIR.glob("*.md")):
try:
push_note(p, state, todoist, project_id)
count += 1
except Exception as e:
log.exception(f"Sweep error on {p}: {e}")
log.info(f"Initial sweep complete ({count} notes)")
def pull_loop(state, todoist, project_id, stop_event: threading.Event):
log.info(f"Pull loop started (every {PULL_INTERVAL_SEC}s)")
while not stop_event.is_set():
try:
changed = pull_from_todoist(state, todoist, project_id)
if changed:
log.info(f"Applied {changed} Todoist → Obsidian change(s)")
except Exception as e:
log.exception(f"Pull loop error: {e}")
stop_event.wait(PULL_INTERVAL_SEC)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--once", action="store_true", help="Run one bidirectional sweep and exit"
)
args = parser.parse_args()
setup_logging()
log.info("=== todoist_sync (two-way) starting ===")
if not VAULT_ROOT.exists():
log.error(f"Vault root does not exist: {VAULT_ROOT}")
sys.exit(1)
TASKS_DIR.mkdir(parents=True, exist_ok=True)
config = load_config()
state = load_state()
todoist = Todoist(config["todoist_token"])
project_name = config.get("project_name", DEFAULT_PROJECT_NAME)
project_id = todoist.get_or_create_project(project_name, state)
initial_sweep(state, todoist, project_id)
pull_from_todoist(state, todoist, project_id)
if args.once:
log.info("Exiting after one-shot sweep")
return
stop_event = threading.Event()
poller = threading.Thread(
target=pull_loop, args=(state, todoist, project_id, stop_event), daemon=True
)
poller.start()
log.info(f"Watching {TASKS_DIR}")
observer = Observer()
observer.schedule(
TaskHandler(state, todoist, project_id), str(TASKS_DIR), recursive=False
)
observer.start()
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
log.info("Interrupted, stopping watcher and poller")
stop_event.set()
observer.stop()
observer.join()
poller.join(timeout=5)
if __name__ == "__main__":
main()
```