Files
scarif/openclaw/workspace/nextcloud-calendar/scripts/ncal.py
2026-03-13 18:47:30 +00:00

623 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Unified Nextcloud CalDAV CLI.
Commands:
list - List calendars
events - View events (supports --today, --date, --start/--end, --search)
add - Create event (--summary, --start, --end, --recurrence, --description)
update - Modify event (--uid OR --summary+--date, with --set-* options)
delete - Remove event (--uid OR --summary+--date)
exception - Create exception for recurring event instance (--uid, --date, --start, --end)
test - Verify connection
Requires environment variables:
NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD, CALDAV_PRINCIPAL
"""
import os, sys, argparse, urllib.request, urllib.error, uuid, json
from datetime import datetime, date, timedelta, timezone
import xml.etree.ElementTree as ET
# Config from environment
NEXTCLOUD_URL = os.getenv('NEXTCLOUD_URL', '').rstrip('/')
NEXTCLOUD_USER = os.getenv('NEXTCLOUD_USER', '')
NEXTCLOUD_PASSWORD = os.getenv('NEXTCLOUD_PASSWORD', '')
CALDAV_PRINCIPAL = os.getenv('CALDAV_PRINCIPAL', '')
NS_DAV = '{DAV:}'
NS_CALDAV = '{urn:ietf:params:xml:ns:caldav}'
def make_request(url, method='PROPFIND', body=None, depth='1', etag=None):
pw = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pw.add_password(None, NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(pw))
req = urllib.request.Request(url, data=body.encode() if body else None, method=method)
req.add_header('Content-Type', 'text/xml; charset=utf-8')
if depth: req.add_header('Depth', depth)
if etag: req.add_header('If-Match', etag)
req.add_header('User-Agent', 'OpenClaw-Calendar/1.0')
return opener.open(req).read().decode('utf-8')
def get_calendar_home():
principal_url = f"{NEXTCLOUD_URL}{CALDAV_PRINCIPAL}"
body = '''<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop><c:calendar-home-set/></d:prop>
</d:propfind>'''
resp = make_request(principal_url, body=body, depth='0')
root = ET.fromstring(resp)
elem = root.find(f'.//{NS_CALDAV}calendar-home-set/{NS_DAV}href')
if elem is not None:
href = elem.text.strip()
return href if href.startswith('http') else f"{NEXTCLOUD_URL}{href}"
return f"{NEXTCLOUD_URL}/remote.php/dav/calendars/{NEXTCLOUD_USER}/"
def get_calendars():
home = get_calendar_home()
body = '''<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop><d:displayname/><c:supported-calendar-component-set/></d:prop>
</d:propfind>'''
resp = make_request(home, body=body, depth='1')
root = ET.fromstring(resp)
cals = []
for r in root.findall(f'{NS_DAV}response'):
href_el = r.find(f'{NS_DAV}href')
if href_el is None or not href_el.text.endswith('/'):
continue
prop = r.find(f'{NS_DAV}propstat/{NS_DAV}prop')
if prop is None:
continue
name_el = prop.find(f'{NS_DAV}displayname')
name = name_el.text if name_el is not None else href_el.text.strip('/').split('/')[-1]
cals.append({
'name': name,
'href': href_el.text,
'url': f"{NEXTCLOUD_URL}{href_el.text}" if href_el.text.startswith('/') else href_el.text
})
return cals
def get_calendar_url_by_name(name=None):
cals = get_calendars()
if not cals:
raise Exception("No calendars found")
if name:
for cal in cals:
if cal['name'] == name:
return cal['url']
raise Exception(f"Calendar '{name}' not found")
# Default to Personal calendar, fallback to first available
for cal in cals:
if cal['name'] == 'Personal':
return cal['url']
return cals[0]['url']
def parse_datetime_ical(dt_str):
dt_str = dt_str.strip()
if dt_str.endswith('Z'):
try:
return datetime.strptime(dt_str, '%Y%m%dT%H%M%SZ').replace(tzinfo=timezone.utc)
except ValueError:
pass
try:
return datetime.strptime(dt_str, '%Y%m%dT%H%M%S')
except ValueError:
try:
return datetime.strptime(dt_str, '%Y%m%d')
except ValueError:
return dt_str
def format_dt(dt):
if isinstance(dt, datetime):
return dt.strftime('%Y-%m-%d %H:%M')
if isinstance(dt, date):
return dt.strftime('%Y-%m-%d')
return str(dt)
def parse_ical_event(ical_text):
lines = ical_text.split('\n')
ev = {'summary': 'No title', 'start': None, 'end': None, 'description': '', 'uid': None, 'rrule': None}
key = None
val = ''
for line in lines:
stripped = line.strip()
if stripped.startswith((' ', '\t')):
if key:
val += stripped[1:]
continue
if key:
if key == 'SUMMARY': ev['summary'] = val
elif key == 'DESCRIPTION': ev['description'] = val
elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val)
elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val)
elif key == 'UID': ev['uid'] = val
elif key == 'RRULE': ev['rrule'] = val
if ':' in stripped:
parts = stripped.split(':', 1)
key = parts[0].split(';')[0]
val = parts[1] if len(parts) > 1 else ''
if key:
if key == 'SUMMARY': ev['summary'] = val
elif key == 'DESCRIPTION': ev['description'] = val
elif key == 'DTSTART': ev['start'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val)
elif key == 'DTEND': ev['end'] = parse_datetime_ical(val.split(';')[1] if ';' in val else val)
elif key == 'UID': ev['uid'] = val
elif key == 'RRULE': ev['rrule'] = val
return ev
def unfold_ical_lines(ical_text):
lines = ical_text.splitlines()
unfolded = []
for line in lines:
if line.startswith((' ', '\t')) and unfolded:
unfolded[-1] += line[1:]
else:
unfolded.append(line)
return unfolded
def parse_prop_line(line):
if ':' not in line:
return None, {}, None
name_params, value = line.split(':', 1)
parts = name_params.split(';')
name = parts[0].strip().upper()
params = {}
for p in parts[1:]:
if '=' in p:
k, v = p.split('=', 1)
params[k.upper()] = v
return name, params, value
def extract_master_event(ical_text, uid):
lines = unfold_ical_lines(ical_text)
events = []
current = None
for line in lines:
if line.strip() == 'BEGIN:VEVENT':
current = []
elif line.strip() == 'END:VEVENT':
if current is not None:
events.append(current)
current = None
elif current is not None:
current.append(line)
for ev_lines in events:
props = {}
has_recurrence_id = False
ev_uid = None
for line in ev_lines:
name, params, value = parse_prop_line(line)
if not name:
continue
if name == 'RECURRENCE-ID':
has_recurrence_id = True
if name == 'UID':
ev_uid = value
props[name] = (value, params)
if ev_uid == uid and not has_recurrence_id:
return props
return None
def insert_exception(ical_text, exception_lines):
insert_text = '\n'.join(exception_lines)
if 'END:VCALENDAR' not in ical_text:
raise Exception('Invalid VCALENDAR data')
head, tail = ical_text.rsplit('END:VCALENDAR', 1)
head = head.rstrip('\n')
new_text = head + '\n' + insert_text + '\nEND:VCALENDAR' + tail
return new_text
def format_ical_date(dt_obj):
return dt_obj.strftime('%Y%m%dT%H%M%S')
def query_events(calendar_url, start_dt, end_dt, search=None):
start_str = start_dt.strftime('%Y%m%dT%H%M%SZ')
end_str = end_dt.strftime('%Y%m%dT%H%M%SZ')
body = f'''<c:calendar-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:caldav">
<d:prop><d:getetag/><c:calendar-data/></d:prop>
<c:filter>
<c:comp-filter name="VCALENDAR">
<c:comp-filter name="VEVENT">
<c:time-range start="{start_str}" end="{end_str}"/>
</c:comp-filter>
</c:comp-filter>
</c:filter>
</c:calendar-query>'''
try:
resp = make_request(calendar_url, method='REPORT', body=body, depth='1')
except Exception:
return []
root = ET.fromstring(resp)
events = []
for r in root.findall(f'{NS_DAV}response'):
href = r.find(f'{NS_DAV}href')
propstat = r.find(f'{NS_DAV}propstat')
if href is None or propstat is None:
continue
prop = propstat.find(f'{NS_DAV}prop')
if prop is None:
continue
etag = prop.find(f'{NS_DAV}getetag')
caldata = prop.find(f'{NS_CALDAV}calendar-data')
if caldata is not None and caldata.text:
ev = parse_ical_event(caldata.text)
if ev and (not search or search.lower() in ev.get('summary','').lower() or search.lower() in ev.get('description','').lower()):
ev['etag'] = etag.text if etag is not None else None
ev['href'] = href.text
events.append(ev)
return events
def ical_dump(ev):
ics = [
'BEGIN:VCALENDAR',
'VERSION:2.0',
'PRODID:-//OpenClaw//Calendar//EN',
'BEGIN:VEVENT'
]
if ev.get('uid'):
ics.append(f"UID:{ev['uid']}")
if ev.get('rrule'):
ics.append(f"RRULE:{ev['rrule']}")
start = ev.get('start')
if isinstance(start, datetime):
ics.append(f"DTSTART:{start.strftime('%Y%m%dT%H%M%S')}")
elif isinstance(start, date):
ics.append(f"DTSTART:{start.strftime('%Y%m%d')}")
end = ev.get('end')
if isinstance(end, datetime):
ics.append(f"DTEND:{end.strftime('%Y%m%dT%H%M%S')}")
elif isinstance(end, date):
ics.append(f"DTEND:{end.strftime('%Y%m%d')}")
ics.append(f"SUMMARY:{ev.get('summary','')}")
if ev.get('description'):
ics.append(f"DESCRIPTION:{ev.get('description','')}")
ics.extend(['END:VEVENT', 'END:VCALENDAR'])
return '\n'.join(ics)
def cmd_list(args):
cals = get_calendars()
if not cals:
print("No calendars found.")
return
print("Calendars:")
for c in cals:
print(f"- {c['name']}")
def cmd_events(args):
cal_url = get_calendar_url_by_name(args.calendar)
now = datetime.now()
if args.today:
start_dt = date(now.year, now.month, now.day)
end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time())
elif args.date:
start_dt = date.fromisoformat(args.date)
end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time())
elif args.start and args.end:
start_dt = datetime.fromisoformat(args.start)
end_dt = datetime.fromisoformat(args.end)
else:
start_dt = date(now.year, now.month, now.day)
end_dt = datetime.combine(start_dt + timedelta(days=1), datetime.min.time())
events = query_events(cal_url, start_dt, end_dt, search=args.search)
if not events:
print("No events found.")
return
events.sort(key=lambda e: e.get('start') or datetime.min)
for ev in events:
start = ev.get('start')
time_str = format_dt(start) if start else 'All-day'
print(f"[{time_str}] {ev.get('summary','')}")
def cmd_add(args):
cal_url = get_calendar_url_by_name(args.calendar)
start_dt = datetime.fromisoformat(args.start)
end_dt = datetime.fromisoformat(args.end) if args.end else start_dt + timedelta(hours=1)
uid = str(uuid.uuid4())
dtstamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
start_str = start_dt.strftime('%Y%m%dT%H%M%S')
end_str = end_dt.strftime('%Y%m%dT%H%M%S')
ics = [
'BEGIN:VCALENDAR',
'VERSION:2.0',
'PRODID:-//OpenClaw//Calendar//EN',
'BEGIN:VEVENT',
f"UID:{uid}",
f"DTSTAMP:{dtstamp}",
f"SUMMARY:{args.summary}",
f"DTSTART:{start_str}",
f"DTEND:{end_str}"
]
if args.recurrence:
ics.append(f"RRULE:{args.recurrence}")
if args.description:
ics.append(f"DESCRIPTION:{args.description}")
ics.extend(['END:VEVENT', 'END:VCALENDAR'])
event_url = f"{cal_url.rstrip('/')}/{uid}.ics"
try:
make_request(event_url, method='PUT', body='\n'.join(ics))
print(f"Added event: {args.summary}")
except Exception as e:
print(f"Failed to add event: {e}", file=sys.stderr)
sys.exit(1)
def cmd_update(args):
cal_url = get_calendar_url_by_name(args.calendar)
if args.uid:
now = datetime.now()
start = now - timedelta(days=365)
end = now + timedelta(days=365)
candidates = query_events(cal_url, start, end)
target = None
for ev in candidates:
if ev.get('uid') == args.uid:
target = ev
break
if not target:
print(f"Event with UID {args.uid} not found.", file=sys.stderr)
sys.exit(1)
href = target['href']
etag = target['etag']
ical_text = make_request(f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href, method='GET')
ev_data = parse_ical_event(ical_text)
else:
if not args.summary or not args.date:
print("Need either --uid or (--summary and --date) to identify event.", file=sys.stderr)
sys.exit(1)
target_date = date.fromisoformat(args.date)
start_dt = datetime.combine(target_date, datetime.min.time())
end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time())
candidates = query_events(cal_url, start_dt, end_dt, search=args.summary)
if not candidates:
print(f"Event not found on {args.date} with summary containing '{args.summary}'.", file=sys.stderr)
sys.exit(1)
if len(candidates) > 1:
print(f"Multiple matches; narrow search or use --uid.", file=sys.stderr)
sys.exit(1)
ev_data = candidates[0]
href = ev_data['href']
etag = ev_data['etag']
ical_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href
ical_text = make_request(ical_url, method='GET')
ev_data = parse_ical_event(ical_text)
if args.set_summary is not None:
ev_data['summary'] = args.set_summary
if args.set_start:
ev_data['start'] = datetime.fromisoformat(args.set_start)
if args.set_end:
ev_data['end'] = datetime.fromisoformat(args.set_end)
if args.set_recurrence is not None:
ev_data['rrule'] = args.set_recurrence if args.set_recurrence else None
new_ical = ical_dump(ev_data)
update_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href
try:
make_request(update_url, method='PUT', body=new_ical, etag=etag)
print(f"Updated event: {ev_data.get('summary')}")
except Exception as e:
print(f"Failed to update: {e}", file=sys.stderr)
sys.exit(1)
def cmd_delete(args):
cal_url = get_calendar_url_by_name(args.calendar)
if args.uid:
now = datetime.now()
start = now - timedelta(days=365)
end = now + timedelta(days=365)
candidates = query_events(cal_url, start, end)
target = None
for ev in candidates:
if ev.get('uid') == args.uid:
target = ev
break
if not target:
print(f"Event with UID {args.uid} not found.", file=sys.stderr)
sys.exit(1)
href = target['href']
etag = target['etag']
elif args.date and args.summary:
target_date = date.fromisoformat(args.date)
start_dt = datetime.combine(target_date, datetime.min.time())
end_dt = datetime.combine(target_date + timedelta(days=1), datetime.min.time())
candidates = query_events(cal_url, start_dt, end_dt, search=args.summary)
if not candidates:
print("Event not found.", file=sys.stderr)
sys.exit(1)
if len(candidates) > 1:
print("Multiple matches; use --uid.", file=sys.stderr)
sys.exit(1)
target = candidates[0]
href = target['href']
etag = target['etag']
else:
print("Need --uid or both --date and --summary.", file=sys.stderr)
sys.exit(1)
delete_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href
try:
make_request(delete_url, method='DELETE', etag=etag)
print(f"Deleted event: {target.get('summary')}")
except Exception as e:
print(f"Failed to delete: {e}", file=sys.stderr)
sys.exit(1)
def cmd_exception(args):
cal_url = get_calendar_url_by_name(args.calendar)
now = datetime.now()
start = now - timedelta(days=365)
end = now + timedelta(days=365*5)
candidates = query_events(cal_url, start, end)
target = None
for ev in candidates:
if ev.get('uid') == args.uid:
target = ev
break
if not target:
print(f"Event with UID {args.uid} not found.", file=sys.stderr)
sys.exit(1)
href = target['href']
etag = target['etag']
ical_url = f"{NEXTCLOUD_URL}{href}" if href.startswith('/') else href
ical_text = make_request(ical_url, method='GET')
master = extract_master_event(ical_text, args.uid)
if not master:
print(f"Master event for UID {args.uid} not found.", file=sys.stderr)
sys.exit(1)
master_dtstart, dtstart_params = master.get('DTSTART', (None, {}))
master_dtend, dtend_params = master.get('DTEND', (None, {}))
master_summary = master.get('SUMMARY', ('', {}))[0]
master_description = master.get('DESCRIPTION', ('', {}))[0]
master_sequence = master.get('SEQUENCE', ('0', {}))[0] or '0'
if not master_dtstart:
print("Master event missing DTSTART.", file=sys.stderr)
sys.exit(1)
tzid = dtstart_params.get('TZID')
# Recurrence-id uses original instance datetime (same time-of-day as master)
if master_dtstart.endswith('Z'):
recurrence_id = f"{args.date.replace('-', '')}T{master_dtstart[9:15]}Z"
else:
time_part = master_dtstart[9:15] if 'T' in master_dtstart else '000000'
recurrence_id = f"{args.date.replace('-', '')}T{time_part}"
start_dt = datetime.fromisoformat(f"{args.date} {args.start}")
end_dt = datetime.fromisoformat(f"{args.date} {args.end}")
start_str = format_ical_date(start_dt)
end_str = format_ical_date(end_dt)
seq = 0
try:
seq = int(master_sequence)
except ValueError:
seq = 0
seq += 1
dtstamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
exception_lines = [
'BEGIN:VEVENT',
f"UID:{args.uid}",
f"DTSTAMP:{dtstamp}",
f"SEQUENCE:{seq}",
]
if tzid:
exception_lines.append(f"RECURRENCE-ID;TZID={tzid}:{recurrence_id}")
exception_lines.append(f"DTSTART;TZID={tzid}:{start_str}")
exception_lines.append(f"DTEND;TZID={tzid}:{end_str}")
else:
exception_lines.append(f"RECURRENCE-ID:{recurrence_id}")
exception_lines.append(f"DTSTART:{start_str}")
exception_lines.append(f"DTEND:{end_str}")
if master_summary:
exception_lines.append(f"SUMMARY:{master_summary}")
if master_description:
exception_lines.append(f"DESCRIPTION:{master_description}")
exception_lines.append('END:VEVENT')
new_ical = insert_exception(ical_text, exception_lines)
try:
make_request(ical_url, method='PUT', body=new_ical, etag=etag)
print(f"Created exception for UID {args.uid} on {args.date}")
except Exception as e:
print(f"Failed to create exception: {e}", file=sys.stderr)
sys.exit(1)
def cmd_test(args):
missing = []
if not NEXTCLOUD_URL: missing.append("NEXTCLOUD_URL")
if not NEXTCLOUD_USER: missing.append("NEXTCLOUD_USER")
if not NEXTCLOUD_PASSWORD: missing.append("NEXTCLOUD_PASSWORD")
if not CALDAV_PRINCIPAL: missing.append("CALDAV_PRINCIPAL")
if missing:
print("Missing environment variables: " + ", ".join(missing))
sys.exit(1)
try:
get_calendar_home()
print("Connection successful. Calendars available:")
for cal in get_calendars():
print(f"- {cal['name']}")
except Exception as e:
print(f"Connection failed: {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='Unified Nextcloud CalDAV CLI')
sub = parser.add_subparsers(dest='cmd', required=True)
sub.add_parser('list', help='List calendars').set_defaults(func=cmd_list)
ev = sub.add_parser('events', help='View events')
ev.add_argument('--calendar', help='Calendar name (default: first)')
grp = ev.add_mutually_exclusive_group()
grp.add_argument('--today', action='store_true')
grp.add_argument('--date', help='Specific date YYYY-MM-DD')
grp.add_argument('--start', help='Start datetime ISO')
ev.add_argument('--end', help='End datetime ISO (with --start)')
ev.add_argument('--search', help='Text search in summary/description')
ev.set_defaults(func=cmd_events)
add = sub.add_parser('add', help='Add event')
add.add_argument('--calendar', help='Calendar name')
add.add_argument('--summary', required=True, help='Event title')
add.add_argument('--start', required=True, help='Start datetime ISO (YYYY-MM-DD HH:MM:SS)')
add.add_argument('--end', help='End datetime ISO (default: start + 1h)')
add.add_argument('--recurrence', help='RRULE string (e.g., FREQ=WEEKLY;BYDAY=MO)')
add.add_argument('--description', help='Event description')
add.set_defaults(func=cmd_add)
upd = sub.add_parser('update', help='Update existing event')
upd.add_argument('--calendar', help='Calendar name')
idgrp = upd.add_mutually_exclusive_group(required=True)
idgrp.add_argument('--uid', help='Event UID to update')
idgrp.add_argument('--summary', help='Event title (partial) match')
upd.add_argument('--date', help='Date of event (required with --summary)')
upd.add_argument('--set-summary', help='New summary')
upd.add_argument('--set-start', help='New start datetime ISO')
upd.add_argument('--set-end', help='New end datetime ISO')
upd.add_argument('--set-recurrence', help='New RRULE (or empty to remove)')
upd.set_defaults(func=cmd_update)
delete = sub.add_parser('delete', help='Delete event')
delete.add_argument('--calendar', help='Calendar name')
delgrp = delete.add_mutually_exclusive_group(required=True)
delgrp.add_argument('--uid', help='Event UID to delete')
delgrp.add_argument('--summary', help='Event title match')
delete.add_argument('--date', help='Date of event (required with --summary)')
delete.set_defaults(func=cmd_delete)
exc = sub.add_parser('exception', help='Create exception for recurring event instance')
exc.add_argument('--calendar', help='Calendar name')
exc.add_argument('--uid', required=True, help='Event UID')
exc.add_argument('--date', required=True, help='Date of instance to override (YYYY-MM-DD)')
exc.add_argument('--start', required=True, help='New start time (HH:MM)')
exc.add_argument('--end', required=True, help='New end time (HH:MM)')
exc.set_defaults(func=cmd_exception)
sub.add_parser('test', help='Test connection and config').set_defaults(func=cmd_test)
args = parser.parse_args()
try:
args.func(args)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()