#!/usr/bin/env python3 """ Unified Nextcloud CalDAV CLI. All-in-one tool for calendar management: - list calendars - events (view with filters: today, date, range, text search) - add (create new events, with optional recurrence) - update (modify existing events) - delete (remove events) - test (verify connection) Environment variables (required): NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD, CALDAV_PRINCIPAL """ import os import sys import argparse import urllib.request import urllib.error import uuid import json from datetime import datetime, date, timedelta, timezone import xml.etree.ElementTree as ET # Read config from environment (no hardcoding) NEXTCLOUD_URL = os.getenv('NEXTCLOUD_URL', '').rstrip('/') NEXTCLOUD_USER = os.getenv('NEXTCLOUD_USER', '') NEXTCLOUD_PASSWORD = os.getenv('NEXTCLOUD_PASSWORD', '') CALDAV_PRINCIPAL = os.getenv('CALDAV_PRINCIPAL', '') NS_DAV = '{DAV:}' NS_CALDAV = '{urn:ietf:params:xml:ns:caldav}' def make_request(url, method='PROPFIND', body=None, depth='1', etag=None): pw_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm() pw_mgr.add_password(None, NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD) opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(pw_mgr)) req = urllib.request.Request(url, data=body.encode() if body else None, method=method) req.add_header('Content-Type', 'text/xml; charset=utf-8') if depth: req.add_header('Depth', depth) if etag: req.add_header('If-Match', etag) req.add_header('User-Agent', 'OpenClaw-Calendar/1.0') return opener.open(req).read().decode('utf-8') def get_calendar_home(): principal_url = f"{NEXTCLOUD_URL}{CALDAV_PRINCIPAL}" body = ''' ''' resp = make_request(principal_url, body=body, depth='0') root = ET.fromstring(resp) elem = root.find(f'.//{NS_CALDAV}calendar-home-set/{NS_DAV}href') if elem is not None: href = elem.text.strip() return href if href.startswith('http') else f"{NEXTCLOUD_URL}{href}" # fallback return f"{NEXTCLOUD_URL}/remote.php/dav/calendars/{NEXTCLOUD_USER}/" def get_calendars(): home = get_calendar_home() body = ''' ''' resp = make_request(home, body=body, depth='1') root = ET.fromstring(resp) cals = [] for r in root.findall(f'{NS_DAV}response'): href_el = r.find(f'{NS_DAV}href') if href_el is None or not href_el.text.endswith('/'): continue prop = r.find(f'{NS_DAV}propstat/{NS_DAV}prop') if prop is None: continue name_el = prop.find(f'{NS_DAV}displayname') name = name_el.text if name_el is not None else href_el.text.strip('/').split('/')[-1] cals.append({'name': name, 'href': href_el.text, 'url': f"{NEXTCLOUD_URL}{href_el.text}" if href_el.text.startswith('/') else href_el.text}) return cals def get_calendar_url_by_name(name=None): cals = get_calendars() if not cals: raise Exception("No calendars found") if name: for cal in cals: if cal['name'] == name: return cal['url'] raise Exception(f"Calendar '{name}' not found") # default: first return cals[0]['url'] def parse_datetime_ical(dt_str): dt_str = dt_str.strip() if dt_str.endswith('Z'): try: return datetime.strptime(dt_str, '%Y%m%dT%H%M%SZ').replace(tzinfo=timezone.utc) except ValueError: pass try: return datetime.strptime(dt_str, '%Y%m%dT%H%M%S') except ValueError: try: return datetime.strptime(dt_str, '%Y%m%d') except ValueError: return dt_str def format_dt(dt): if isinstance(dt, datetime): return dt.strftime('%Y-%m-%d %H:%M') if isinstance(dt, date): return dt.strftime('%Y-%m-%d') return str(dt) def parse_ical_event(ical_text): lines = ical_text.split('\n') ev = {'summary': 'No title', 'start': None, 'end': None, 'description': '', 'uid': None, 'rrule': None} key = None val = '' for line in lines: stripped = line.strip() if stripped.startswith(' ') or stripped.startswith('\t'): if key: val += stripped[1:] continue if key: if key == 'SUMMARY': ev['summary'] = val elif key == 'DESCRIPTION': ev['description'] = val elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'UID': ev['uid'] = val elif key == 'RRULE': ev['rrule'] = val if ':' in stripped: parts = stripped.split(':', 1) key = parts[0].split(';')[0] val = parts[1] if len(parts) > 1 else '' if key: if key == 'SUMMARY': ev['summary'] = val elif key == 'DESCRIPTION': ev['description'] = val elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'UID': ev['uid'] = val elif key == 'RRULE': ev['rrule'] = val return ev def query_events(calendar_url, start_dt, end_dt, search=None): start_str = start_dt.strftime('%Y%m%dT%H%M%SZ') end_str = end_dt.strftime('%Y%m%dT%H%M%SZ') body = f''' ''' try: resp = make_request(calendar_url, method='REPORT', body=body, depth='1') except Exception: return [] root = ET.fromstring(resp) events = [] for r in root.findall(f'{NS_DAV}response'): href = r.find(f'{NS_DAV}href') propstat = r.find(f'{NS_DAV}propstat') if href is None or propstat is None: continue prop = propstat.find(f'{NS_DAV}prop') if prop is None: continue etag = prop.find(f'{NS_DAV}getetag') caldata = prop.find(f'{NS_CALDAV}calendar-data') if caldata is not None and caldata.text: ev = parse_ical_event(caldata.text) if ev and (not search or search.lower() in ev.get('summary','').lower() or search.lower() in ev.get('description','').lower()): ev['etag'] = etag.text if etag is not None else None ev['href'] = href.text events.append(ev) return events def ical_dump(ev): ics = [] ics.append('BEGIN:VCALENDAR') ics.append('VERSION:2.0') ics.append('PRODID:-//OpenClaw//Calendar//EN') ics.append('BEGIN:VEVENT') if ev.get('uid'): ics.append(f"UID:{ev['uid']}") if ev.get('rrule'): ics.append(f"RRULE:{ev['rrule']}") # DTSTART with TZID if original had one; simplified here start = ev.get('start') if isinstance(start, datetime): ics.append(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}") elif isinstance(start, date): ics.append(f"DTSTART:{start.strftime('%Y%m%d')}") end = ev.get('end') if isinstance(end, datetime): ics.append(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}") elif isinstance(end, date): ics.append(f"DTEND:{end.strftime('%Y%m%d')}") ics.append(f"SUMMARY:{ev.get('summary','')}") if ev.get('description'): ics.append(f"DESCRIPTION:{ev.get('description','')}") ics.append('END:VEVENT') ics.append('END:VCALENDAR') return '\n'.join(ics) def cmd_list(args): cals = get_calendars() if not cals: print("No calendars found.") return print("Calendars:") for c in cals: print(f"- {c['name']}") def cmd_events(args): cal_url = get_calendar_url_by_name(args.calendar) now = datetime.now() if args.today: start_dt = date(now.year, now.month, now.day) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) elif args.date: start_dt = date.fromisoformat(args.date) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) elif args.start and args.end: start_dt = datetime.fromisoformat(args.start) end_dt = datetime.fromisoformat(args.end) else: # default: today start_dt = date(now.year, now.month, now.day) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) events = query_events(cal_url, start_dt, end_dt, search=args.search) if not events: print("No events found.") return # sort by start events.sort(key=lambda e: e.get('start') or datetime.min) out = [] for ev in events: start = ev.get('start') time_str = format_dt(start) if start else 'All-day' out.append(f"[{time_str}] {ev.get('summary','')}") print("\n".join(out)) def cmd_add(args): cal_url = get_calendar_url_by_name(args.calendar) start_dt = datetime.fromisoformat(args.start) end_dt = datetime.fromisoformat(args.end) if args.end else start_dt + timedelta(hours=1) uid = str(uuid.uuid4()) dtstamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') start_str = start_dt.strftime('%Y%m%dT%H%M%S') end_str = end_dt.strftime('%Y%m%dT%H%M%S') ics = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//OpenClaw//Calendar//EN", "BEGIN:VEVENT", f"UID:{uid}", f"DTSTAMP:{dtstamp}", f"SUMMARY:{args.summary}", f"DTSTART:{start_str}", f"DTEND:{end_str}" ] if args.recurrence: ics.append(f"RRULE:{args.recurrence}") if args.description: ics.append(f"DESCRIPTION:{args.description}") ics.extend(["END:VEVENT", "END:VCALENDAR"]) event_url = f"{cal_url.rstrip('/')}/{uid}.ics" try: make_request(event_url, method='PUT', body='\n'.join(ics)) print(f"Added event: {args.summary}") except Exception as e: print(f"Failed to add event: {e}", file=sys.stderr) sys.exit(1) def cmd_update(args): # Need to find the event. Use search or known UID/HREF. cal_url = get_calendar_url_by_name(args.calendar) # If UID provided directly, we need to locate the href via a query first if args.uid: # search by UID in recent range (expand a window) now = datetime.now() start = now - timedelta(days=365) end = now + timedelta(days=365) candidates = query_events(cal_url, start, end, search=None) target = None for ev in candidates: if ev.get('uid') == args.uid: target = ev break if not target: print(f"Event with UID {args.uid} not found.", file=sys.stderr) sys.exit(1) href = target['href'] etag = target['etag'] # fetch full iCal (we already have it partially) ical_data = make_request(f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href, method='GET') ev_data = parse_ical_event(ical_data) else: # Must have summary + date to identify if not args.summary or not args.date: print("Need either --uid or (--summary and --date) to identify event.", file=sys.stderr) sys.exit(1) target_date = date.fromisoformat(args.date) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time()) candidates = query_events(cal_url, start_dt, end_dt, search=args.summary) if not candidates: print(f"Event not found on {args.date} with summary containing '{args.summary}'.", file=sys.stderr) sys.exit(1) if len(candidates) > 1: print(f"Multiple matches; narrow search or use --uid.", file=sys.stderr) sys.exit(1) ev_data = candidates[0] href = ev_data['href'] etag = ev_data['etag'] ical_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href ical_text = make_request(ical_url, method='GET') ev_data = parse_ical_event(ical_text) # Apply updates if args.set_summary is not None: ev_data['summary'] = args.set_summary if args.set_start: ev_data['start'] = datetime.fromisoformat(args.set_start) if args.set_end: ev_data['end'] = datetime.fromisoformat(args.set_end) if args.set_recurrence is not None: ev_data['rrule'] = args.set_recurrence if args.set_recurrence else None # Rebuild iCal new_ical = ical_dump(ev_data) update_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href try: make_request(update_url, method='PUT', body=new_ical, etag=etag) print(f"Updated event: {ev_data.get('summary')}") except Exception as e: print(f"Failed to update: {e}", file=sys.stderr) sys.exit(1) def cmd_delete(args): cal_url = get_calendar_url_by_name(args.calendar) if args.uid: now = datetime.now() window_start = now - timedelta(days=365) window_end = now + timedelta(days=365) candidates = query_events(cal_url, window_start, window_end) target = None for ev in candidates: if ev.get('uid') == args.uid: target = ev break if not target: print(f"Event with UID {args.uid} not found.", file=sys.stderr) sys.exit(1) href = target['href'] etag = target['etag'] elif args.date and args.summary: target_date = date.fromisoformat(args.date) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time()) candidates = query_events(cal_url, start_dt, end_dt, search=args.summary) if not candidates: print("Event not found.", file=sys.stderr) sys.exit(1) if len(candidates) > 1: print("Multiple matches; use --uid to be specific.", file=sys.stderr) sys.exit(1) target = candidates[0] href = target['href'] etag = target['etag'] else: print("Need --uid or both --date and --summary.", file=sys.stderr) sys.exit(1) delete_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href try: make_request(delete_url, method='DELETE', etag=etag) print(f"Deleted event: {target.get('summary')}") except Exception as e: print(f"Failed to delete: {e}", file=sys.stderr) sys.exit(1) def cmd_test(args): ok = True msg = [] if not NEXTCLOUD_URL: ok = False msg.append("NEXTCLOUD_URL not set") if not NEXTCLOUD_USER: ok = False msg.append("NEXTCLOUD_USER not set") if not NEXTCLOUD_PASSWORD: ok = False msg.append("NEXTCLOUD_PASSWORD not set") if not CALDAV_PRINCIPAL: ok = False msg.append("CALDAV_PRINCIPAL not set") if not ok: print("Missing config:\n " + "\n ".join(msg)) sys.exit(1) try: get_calendar_home() print("Connection successful.") except Exception as e: print(f"Connection failed: {e}") sys.exit(1) def main(): parser = argparse.ArgumentParser(description='Unified Nextcloud CalDAV CLI') sub = parser.add_subparsers(dest='cmd', required=True) sub_list = sub.add_parser('list', help='List calendars') sub_list.set_defaults(func=cmd_list) sub_events = sub.add_parser('events', help='View events') sub_events.add_argument('--calendar', help='Calendar name (default: first)') grp = sub_events.add_mutually_exclusive_group() grp.add_argument('--today', action='store_true') grp.add_argument('--date', help='Specific date YYYY-MM-DD') grp.add_argument('--start', '--begin', help='Start datetime ISO') sub_events.add_argument('--end', help='End datetime ISO (with --start)') sub_events.add_argument('--search', help='Text search in summary/description') sub_events.set_defaults(func=cmd_events) sub_add = sub.add_parser('add', help='Add event') sub_add.add_argument('--calendar', help='Calendar name') sub_add.add_argument('--summary', required=True, help='Event title') sub_add.add_argument('--start', required=True, help='Start datetime ISO (YYYY-MM-DD HH:MM:SS or ISO format)') sub_add.add_argument('--end', help='End datetime ISO (default: start + 1h)') sub_add.add_argument('--recurrence', help='RRULE string (e.g., FREQ=WEEKLY;BYDAY=MO)') sub_add.add_argument('--description', help='Event description') sub_add.set_defaults(func=cmd_add) sub_update = sub.add_parser('update', help='Update existing event') sub_update.add_argument('--calendar', help='Calendar name') idgrp = sub_update.add_mutually_exclusive_group(required=True) idgrp.add_argument('--uid', help='Event UID to update') idgrp.add_argument('--summary', help='Event title (partial) match') sub_update.add_argument('--date', help='Date of event (required with --summary)') sub_update.add_argument('--set-summary', help='New summary') sub_update.add_argument('--set-start', help='New start datetime ISO') sub_update.add_argument('--set-end', help='New end datetime ISO') sub_update.add_argument('--set-recurrence', help='New RRULE (or empty to remove)') sub_update.set_defaults(func=cmd_update) sub_delete = sub.add_parser('delete', help='Delete event') sub_delete.add_argument('--calendar', help='Calendar name') delgrp = sub_delete.add_mutually_exclusive_group(required=True) delgrp.add_argument('--uid', help='Event UID to delete') delgrp.add_argument('--summary', help='Event title match') sub_delete.add_argument('--date', help='Date of event (required with --summary)') sub_delete.set_defaults(func=cmd_delete) sub_test = sub.add_parser('test', help='Test connection and config') sub_test.set_defaults(func=cmd_test) args = parser.parse_args() try: args.func(args) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()