#!/usr/bin/env python3 """ Unified Nextcloud CalDAV CLI. Commands: list - List calendars events - View events (supports --today, --date, --start/--end, --search) add - Create event (--summary, --start, --end, --recurrence, --description) update - Modify event (--uid OR --summary+--date, with --set-* options) delete - Remove event (--uid OR --summary+--date) exception - Create exception for recurring event instance (--uid, --date, --start, --end) test - Verify connection Requires environment variables: NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD, CALDAV_PRINCIPAL """ import os, sys, argparse, urllib.request, urllib.error, uuid, json from datetime import datetime, date, timedelta, timezone import xml.etree.ElementTree as ET # Config from environment NEXTCLOUD_URL = os.getenv('NEXTCLOUD_URL', '').rstrip('/') NEXTCLOUD_USER = os.getenv('NEXTCLOUD_USER', '') NEXTCLOUD_PASSWORD = os.getenv('NEXTCLOUD_PASSWORD', '') CALDAV_PRINCIPAL = os.getenv('CALDAV_PRINCIPAL', '') NS_DAV = '{DAV:}' NS_CALDAV = '{urn:ietf:params:xml:ns:caldav}' def make_request(url, method='PROPFIND', body=None, depth='1', etag=None): pw = urllib.request.HTTPPasswordMgrWithDefaultRealm() pw.add_password(None, NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD) opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(pw)) req = urllib.request.Request(url, data=body.encode() if body else None, method=method) req.add_header('Content-Type', 'text/xml; charset=utf-8') if depth: req.add_header('Depth', depth) if etag: req.add_header('If-Match', etag) req.add_header('User-Agent', 'OpenClaw-Calendar/1.0') return opener.open(req).read().decode('utf-8') def get_calendar_home(): principal_url = f"{NEXTCLOUD_URL}{CALDAV_PRINCIPAL}" body = ''' ''' resp = make_request(principal_url, body=body, depth='0') root = ET.fromstring(resp) elem = root.find(f'.//{NS_CALDAV}calendar-home-set/{NS_DAV}href') if elem is not None: href = elem.text.strip() return href if href.startswith('http') else f"{NEXTCLOUD_URL}{href}" return f"{NEXTCLOUD_URL}/remote.php/dav/calendars/{NEXTCLOUD_USER}/" def get_calendars(): home = get_calendar_home() body = ''' ''' resp = make_request(home, body=body, depth='1') root = ET.fromstring(resp) cals = [] for r in root.findall(f'{NS_DAV}response'): href_el = r.find(f'{NS_DAV}href') if href_el is None or not href_el.text.endswith('/'): continue prop = r.find(f'{NS_DAV}propstat/{NS_DAV}prop') if prop is None: continue name_el = prop.find(f'{NS_DAV}displayname') name = name_el.text if name_el is not None else href_el.text.strip('/').split('/')[-1] cals.append({ 'name': name, 'href': href_el.text, 'url': f"{NEXTCLOUD_URL}{href_el.text}" if href_el.text.startswith('/') else href_el.text }) return cals def get_calendar_url_by_name(name=None): cals = get_calendars() if not cals: raise Exception("No calendars found") if name: for cal in cals: if cal['name'] == name: return cal['url'] raise Exception(f"Calendar '{name}' not found") # Default to Personal calendar, fallback to first available for cal in cals: if cal['name'] == 'Personal': return cal['url'] return cals[0]['url'] def parse_datetime_ical(dt_str): dt_str = dt_str.strip() if dt_str.endswith('Z'): try: return datetime.strptime(dt_str, '%Y%m%dT%H%M%SZ').replace(tzinfo=timezone.utc) except ValueError: pass try: return datetime.strptime(dt_str, '%Y%m%dT%H%M%S') except ValueError: try: return datetime.strptime(dt_str, '%Y%m%d') except ValueError: return dt_str def format_dt(dt): if isinstance(dt, datetime): return dt.strftime('%Y-%m-%d %H:%M') if isinstance(dt, date): return dt.strftime('%Y-%m-%d') return str(dt) def parse_ical_event(ical_text): lines = ical_text.split('\n') ev = {'summary': 'No title', 'start': None, 'end': None, 'description': '', 'uid': None, 'rrule': None} key = None val = '' for line in lines: stripped = line.strip() if stripped.startswith((' ', '\t')): if key: val += stripped[1:] continue if key: if key == 'SUMMARY': ev['summary'] = val elif key == 'DESCRIPTION': ev['description'] = val elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'UID': ev['uid'] = val elif key == 'RRULE': ev['rrule'] = val if ':' in stripped: parts = stripped.split(':', 1) key = parts[0].split(';')[0] val = parts[1] if len(parts) > 1 else '' if key: if key == 'SUMMARY': ev['summary'] = val elif key == 'DESCRIPTION': ev['description'] = val elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val) elif key == 'UID': ev['uid'] = val elif key == 'RRULE': ev['rrule'] = val return ev def unfold_ical_lines(ical_text): lines = ical_text.splitlines() unfolded = [] for line in lines: if line.startswith((' ', '\t')) and unfolded: unfolded[-1] += line[1:] else: unfolded.append(line) return unfolded def parse_prop_line(line): if ':' not in line: return None, {}, None name_params, value = line.split(':', 1) parts = name_params.split(';') name = parts[0].strip().upper() params = {} for p in parts[1:]: if '=' in p: k, v = p.split('=', 1) params[k.upper()] = v return name, params, value def extract_master_event(ical_text, uid): lines = unfold_ical_lines(ical_text) events = [] current = None for line in lines: if line.strip() == 'BEGIN:VEVENT': current = [] elif line.strip() == 'END:VEVENT': if current is not None: events.append(current) current = None elif current is not None: current.append(line) for ev_lines in events: props = {} has_recurrence_id = False ev_uid = None for line in ev_lines: name, params, value = parse_prop_line(line) if not name: continue if name == 'RECURRENCE-ID': has_recurrence_id = True if name == 'UID': ev_uid = value props[name] = (value, params) if ev_uid == uid and not has_recurrence_id: return props return None def insert_exception(ical_text, exception_lines): insert_text = '\n'.join(exception_lines) if 'END:VCALENDAR' not in ical_text: raise Exception('Invalid VCALENDAR data') head, tail = ical_text.rsplit('END:VCALENDAR', 1) head = head.rstrip('\n') new_text = head + '\n' + insert_text + '\nEND:VCALENDAR' + tail return new_text def format_ical_date(dt_obj): return dt_obj.strftime('%Y%m%dT%H%M%S') def query_events(calendar_url, start_dt, end_dt, search=None): start_str = start_dt.strftime('%Y%m%dT%H%M%SZ') end_str = end_dt.strftime('%Y%m%dT%H%M%SZ') body = f''' ''' try: resp = make_request(calendar_url, method='REPORT', body=body, depth='1') except Exception: return [] root = ET.fromstring(resp) events = [] for r in root.findall(f'{NS_DAV}response'): href = r.find(f'{NS_DAV}href') propstat = r.find(f'{NS_DAV}propstat') if href is None or propstat is None: continue prop = propstat.find(f'{NS_DAV}prop') if prop is None: continue etag = prop.find(f'{NS_DAV}getetag') caldata = prop.find(f'{NS_CALDAV}calendar-data') if caldata is not None and caldata.text: ev = parse_ical_event(caldata.text) if ev and (not search or search.lower() in ev.get('summary','').lower() or search.lower() in ev.get('description','').lower()): ev['etag'] = etag.text if etag is not None else None ev['href'] = href.text events.append(ev) return events def ical_dump(ev): ics = [ 'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//OpenClaw//Calendar//EN', 'BEGIN:VEVENT' ] if ev.get('uid'): ics.append(f"UID:{ev['uid']}") if ev.get('rrule'): ics.append(f"RRULE:{ev['rrule']}") start = ev.get('start') if isinstance(start, datetime): ics.append(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}") elif isinstance(start, date): ics.append(f"DTSTART:{start.strftime('%Y%m%d')}") end = ev.get('end') if isinstance(end, datetime): ics.append(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}") elif isinstance(end, date): ics.append(f"DTEND:{end.strftime('%Y%m%d')}") ics.append(f"SUMMARY:{ev.get('summary','')}") if ev.get('description'): ics.append(f"DESCRIPTION:{ev.get('description','')}") ics.extend(['END:VEVENT', 'END:VCALENDAR']) return '\n'.join(ics) def cmd_list(args): cals = get_calendars() if not cals: print("No calendars found.") return print("Calendars:") for c in cals: print(f"- {c['name']}") def cmd_events(args): cal_url = get_calendar_url_by_name(args.calendar) now = datetime.now() if args.today: start_dt = date(now.year, now.month, now.day) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) elif args.date: start_dt = date.fromisoformat(args.date) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) elif args.start and args.end: start_dt = datetime.fromisoformat(args.start) end_dt = datetime.fromisoformat(args.end) else: start_dt = date(now.year, now.month, now.day) end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time()) events = query_events(cal_url, start_dt, end_dt, search=args.search) if not events: print("No events found.") return events.sort(key=lambda e: e.get('start') or datetime.min) for ev in events: start = ev.get('start') time_str = format_dt(start) if start else 'All-day' print(f"[{time_str}] {ev.get('summary','')}") def cmd_add(args): cal_url = get_calendar_url_by_name(args.calendar) start_dt = datetime.fromisoformat(args.start) end_dt = datetime.fromisoformat(args.end) if args.end else start_dt + timedelta(hours=1) uid = str(uuid.uuid4()) dtstamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') start_str = start_dt.strftime('%Y%m%dT%H%M%S') end_str = end_dt.strftime('%Y%m%dT%H%M%S') ics = [ 'BEGIN:VCALENDAR', 'VERSION:2.0', 'PRODID:-//OpenClaw//Calendar//EN', 'BEGIN:VEVENT', f"UID:{uid}", f"DTSTAMP:{dtstamp}", f"SUMMARY:{args.summary}", f"DTSTART:{start_str}", f"DTEND:{end_str}" ] if args.recurrence: ics.append(f"RRULE:{args.recurrence}") if args.description: ics.append(f"DESCRIPTION:{args.description}") ics.extend(['END:VEVENT', 'END:VCALENDAR']) event_url = f"{cal_url.rstrip('/')}/{uid}.ics" try: make_request(event_url, method='PUT', body='\n'.join(ics)) print(f"Added event: {args.summary}") except Exception as e: print(f"Failed to add event: {e}", file=sys.stderr) sys.exit(1) def cmd_update(args): cal_url = get_calendar_url_by_name(args.calendar) if args.uid: now = datetime.now() start = now - timedelta(days=365) end = now + timedelta(days=365) candidates = query_events(cal_url, start, end) target = None for ev in candidates: if ev.get('uid') == args.uid: target = ev break if not target: print(f"Event with UID {args.uid} not found.", file=sys.stderr) sys.exit(1) href = target['href'] etag = target['etag'] ical_text = make_request(f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href, method='GET') ev_data = parse_ical_event(ical_text) else: if not args.summary or not args.date: print("Need either --uid or (--summary and --date) to identify event.", file=sys.stderr) sys.exit(1) target_date = date.fromisoformat(args.date) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time()) candidates = query_events(cal_url, start_dt, end_dt, search=args.summary) if not candidates: print(f"Event not found on {args.date} with summary containing '{args.summary}'.", file=sys.stderr) sys.exit(1) if len(candidates) > 1: print(f"Multiple matches; narrow search or use --uid.", file=sys.stderr) sys.exit(1) ev_data = candidates[0] href = ev_data['href'] etag = ev_data['etag'] ical_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href ical_text = make_request(ical_url, method='GET') ev_data = parse_ical_event(ical_text) if args.set_summary is not None: ev_data['summary'] = args.set_summary if args.set_start: ev_data['start'] = datetime.fromisoformat(args.set_start) if args.set_end: ev_data['end'] = datetime.fromisoformat(args.set_end) if args.set_recurrence is not None: ev_data['rrule'] = args.set_recurrence if args.set_recurrence else None new_ical = ical_dump(ev_data) update_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href try: make_request(update_url, method='PUT', body=new_ical, etag=etag) print(f"Updated event: {ev_data.get('summary')}") except Exception as e: print(f"Failed to update: {e}", file=sys.stderr) sys.exit(1) def cmd_delete(args): cal_url = get_calendar_url_by_name(args.calendar) if args.uid: now = datetime.now() start = now - timedelta(days=365) end = now + timedelta(days=365) candidates = query_events(cal_url, start, end) target = None for ev in candidates: if ev.get('uid') == args.uid: target = ev break if not target: print(f"Event with UID {args.uid} not found.", file=sys.stderr) sys.exit(1) href = target['href'] etag = target['etag'] elif args.date and args.summary: target_date = date.fromisoformat(args.date) start_dt = datetime.combine(target_date, datetime.min.time()) end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time()) candidates = query_events(cal_url, start_dt, end_dt, search=args.summary) if not candidates: print("Event not found.", file=sys.stderr) sys.exit(1) if len(candidates) > 1: print("Multiple matches; use --uid.", file=sys.stderr) sys.exit(1) target = candidates[0] href = target['href'] etag = target['etag'] else: print("Need --uid or both --date and --summary.", file=sys.stderr) sys.exit(1) delete_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href try: make_request(delete_url, method='DELETE', etag=etag) print(f"Deleted event: {target.get('summary')}") except Exception as e: print(f"Failed to delete: {e}", file=sys.stderr) sys.exit(1) def cmd_exception(args): cal_url = get_calendar_url_by_name(args.calendar) now = datetime.now() start = now - timedelta(days=365) end = now + timedelta(days=365*5) candidates = query_events(cal_url, start, end) target = None for ev in candidates: if ev.get('uid') == args.uid: target = ev break if not target: print(f"Event with UID {args.uid} not found.", file=sys.stderr) sys.exit(1) href = target['href'] etag = target['etag'] ical_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href ical_text = make_request(ical_url, method='GET') master = extract_master_event(ical_text, args.uid) if not master: print(f"Master event for UID {args.uid} not found.", file=sys.stderr) sys.exit(1) master_dtstart, dtstart_params = master.get('DTSTART', (None, {})) master_dtend, dtend_params = master.get('DTEND', (None, {})) master_summary = master.get('SUMMARY', ('', {}))[0] master_description = master.get('DESCRIPTION', ('', {}))[0] master_sequence = master.get('SEQUENCE', ('0', {}))[0] or '0' if not master_dtstart: print("Master event missing DTSTART.", file=sys.stderr) sys.exit(1) tzid = dtstart_params.get('TZID') # Recurrence-id uses original instance datetime (same time-of-day as master) if master_dtstart.endswith('Z'): recurrence_id = f"{args.date.replace('-', '')}T{master_dtstart[9:15]}Z" else: time_part = master_dtstart[9:15] if 'T' in master_dtstart else '000000' recurrence_id = f"{args.date.replace('-', '')}T{time_part}" start_dt = datetime.fromisoformat(f"{args.date} {args.start}") end_dt = datetime.fromisoformat(f"{args.date} {args.end}") start_str = format_ical_date(start_dt) end_str = format_ical_date(end_dt) seq = 0 try: seq = int(master_sequence) except ValueError: seq = 0 seq += 1 dtstamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') exception_lines = [ 'BEGIN:VEVENT', f"UID:{args.uid}", f"DTSTAMP:{dtstamp}", f"SEQUENCE:{seq}", ] if tzid: exception_lines.append(f"RECURRENCE-ID;TZID={tzid}:{recurrence_id}") exception_lines.append(f"DTSTART;TZID={tzid}:{start_str}") exception_lines.append(f"DTEND;TZID={tzid}:{end_str}") else: exception_lines.append(f"RECURRENCE-ID:{recurrence_id}") exception_lines.append(f"DTSTART:{start_str}") exception_lines.append(f"DTEND:{end_str}") if master_summary: exception_lines.append(f"SUMMARY:{master_summary}") if master_description: exception_lines.append(f"DESCRIPTION:{master_description}") exception_lines.append('END:VEVENT') new_ical = insert_exception(ical_text, exception_lines) try: make_request(ical_url, method='PUT', body=new_ical, etag=etag) print(f"Created exception for UID {args.uid} on {args.date}") except Exception as e: print(f"Failed to create exception: {e}", file=sys.stderr) sys.exit(1) def cmd_test(args): missing = [] if not NEXTCLOUD_URL: missing.append("NEXTCLOUD_URL") if not NEXTCLOUD_USER: missing.append("NEXTCLOUD_USER") if not NEXTCLOUD_PASSWORD: missing.append("NEXTCLOUD_PASSWORD") if not CALDAV_PRINCIPAL: missing.append("CALDAV_PRINCIPAL") if missing: print("Missing environment variables: " + ", ".join(missing)) sys.exit(1) try: get_calendar_home() print("Connection successful. Calendars available:") for cal in get_calendars(): print(f"- {cal['name']}") except Exception as e: print(f"Connection failed: {e}") sys.exit(1) def main(): parser = argparse.ArgumentParser(description='Unified Nextcloud CalDAV CLI') sub = parser.add_subparsers(dest='cmd', required=True) sub.add_parser('list', help='List calendars').set_defaults(func=cmd_list) ev = sub.add_parser('events', help='View events') ev.add_argument('--calendar', help='Calendar name (default: first)') grp = ev.add_mutually_exclusive_group() grp.add_argument('--today', action='store_true') grp.add_argument('--date', help='Specific date YYYY-MM-DD') grp.add_argument('--start', help='Start datetime ISO') ev.add_argument('--end', help='End datetime ISO (with --start)') ev.add_argument('--search', help='Text search in summary/description') ev.set_defaults(func=cmd_events) add = sub.add_parser('add', help='Add event') add.add_argument('--calendar', help='Calendar name') add.add_argument('--summary', required=True, help='Event title') add.add_argument('--start', required=True, help='Start datetime ISO (YYYY-MM-DD HH:MM:SS)') add.add_argument('--end', help='End datetime ISO (default: start + 1h)') add.add_argument('--recurrence', help='RRULE string (e.g., FREQ=WEEKLY;BYDAY=MO)') add.add_argument('--description', help='Event description') add.set_defaults(func=cmd_add) upd = sub.add_parser('update', help='Update existing event') upd.add_argument('--calendar', help='Calendar name') idgrp = upd.add_mutually_exclusive_group(required=True) idgrp.add_argument('--uid', help='Event UID to update') idgrp.add_argument('--summary', help='Event title (partial) match') upd.add_argument('--date', help='Date of event (required with --summary)') upd.add_argument('--set-summary', help='New summary') upd.add_argument('--set-start', help='New start datetime ISO') upd.add_argument('--set-end', help='New end datetime ISO') upd.add_argument('--set-recurrence', help='New RRULE (or empty to remove)') upd.set_defaults(func=cmd_update) delete = sub.add_parser('delete', help='Delete event') delete.add_argument('--calendar', help='Calendar name') delgrp = delete.add_mutually_exclusive_group(required=True) delgrp.add_argument('--uid', help='Event UID to delete') delgrp.add_argument('--summary', help='Event title match') delete.add_argument('--date', help='Date of event (required with --summary)') delete.set_defaults(func=cmd_delete) exc = sub.add_parser('exception', help='Create exception for recurring event instance') exc.add_argument('--calendar', help='Calendar name') exc.add_argument('--uid', required=True, help='Event UID') exc.add_argument('--date', required=True, help='Date of instance to override (YYYY-MM-DD)') exc.add_argument('--start', required=True, help='New start time (HH:MM)') exc.add_argument('--end', required=True, help='New end time (HH:MM)') exc.set_defaults(func=cmd_exception) sub.add_parser('test', help='Test connection and config').set_defaults(func=cmd_test) args = parser.parse_args() try: args.func(args) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()