bin/timestamp_icloud_photos_for_nextcloud: introduce
This commit is contained in:
parent
fcd92db125
commit
60c2c42a49
1 changed files with 216 additions and 0 deletions
216
bin/timestamp_icloud_photos_for_nextcloud
Normal file
216
bin/timestamp_icloud_photos_for_nextcloud
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from subprocess import check_output, CalledProcessError
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import json
|
||||
from argparse import ArgumentParser
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from os import cpu_count
|
||||
from time import sleep
|
||||
|
||||
EXT_GROUPS = {
|
||||
"quicktime": {".mp4", ".mov", ".heic", ".cr3"},
|
||||
"exif": {".jpg", ".jpeg", ".cr2"},
|
||||
}
|
||||
DATETIME_KEYS = [
|
||||
("Composite", "SubSecDateTimeOriginal"),
|
||||
("Composite", "SubSecCreateDate"),
|
||||
('ExifIFD', 'DateTimeOriginal'),
|
||||
('ExifIFD', 'CreateDate'),
|
||||
('XMP-xmp', 'CreateDate'),
|
||||
('Keys', 'CreationDate'),
|
||||
('QuickTime', 'CreateDate'),
|
||||
('XMP-photoshop', 'DateCreated'),
|
||||
]
|
||||
|
||||
def run(command):
|
||||
return check_output(command, text=True).strip()
|
||||
|
||||
|
||||
def mdls_timestamp(file):
|
||||
for i in range(5): # retry a few times in case of transient mdls failures
|
||||
try:
|
||||
output = run(('mdls', '-raw', '-name', 'kMDItemContentCreationDate', file))
|
||||
except CalledProcessError as e:
|
||||
print(f"{file}: Error running mdls (attempt {i+1}/5): {e}")
|
||||
continue
|
||||
|
||||
try:
|
||||
return datetime.strptime(output, "%Y-%m-%d %H:%M:%S %z")
|
||||
except ValueError as e:
|
||||
print(f"{file}: Error parsing mdls output (attempt {i+1}/5): {e}")
|
||||
continue
|
||||
|
||||
sleep(1)
|
||||
|
||||
raise RuntimeError(f"Failed to get mdls timestamp for {file} after 5 attempts")
|
||||
|
||||
|
||||
def exiftool_data(file):
|
||||
try:
|
||||
output = run((
|
||||
'exiftool',
|
||||
'-j', # json
|
||||
'-a', # unknown tags
|
||||
'-u', # unknown values
|
||||
'-g1', # group by category
|
||||
'-time:all', # all time tags
|
||||
'-api', 'QuickTimeUTC=1', # use UTC for QuickTime timestamps
|
||||
'-d', '%Y-%m-%dT%H:%M:%S%z',
|
||||
file,
|
||||
))
|
||||
except CalledProcessError as e:
|
||||
print(f"Error running exiftool: {e}")
|
||||
return None
|
||||
else:
|
||||
return json.loads(output)[0]
|
||||
|
||||
def exiftool_timestamp(file):
|
||||
data = exiftool_data(file)
|
||||
for category, key in DATETIME_KEYS:
|
||||
try:
|
||||
value = data[category][key]
|
||||
return category, key, datetime.strptime(value, '%Y-%m-%dT%H:%M:%S%z')
|
||||
except (TypeError, KeyError, ValueError) as e:
|
||||
continue
|
||||
print(f"⚠️ {file}: No timestamp found in exiftool: " + json.dumps(data, indent=2))
|
||||
return None, None, None
|
||||
|
||||
|
||||
def photo_has_embedded_timestamp(file):
|
||||
mdls_ts = mdls_timestamp(file)
|
||||
category, key, exiftool_ts = exiftool_timestamp(file)
|
||||
|
||||
if not exiftool_ts:
|
||||
print(f"⚠️ {file}: No timestamp found in exiftool")
|
||||
return False
|
||||
|
||||
# normalize timezone for comparison
|
||||
exiftool_ts = exiftool_ts.astimezone(mdls_ts.tzinfo)
|
||||
delta = abs(mdls_ts - exiftool_ts)
|
||||
|
||||
if delta < timedelta(hours=1): # allow for small differences
|
||||
print(f"✅ {file}: {mdls_ts.isoformat()} (#{category}:{key})")
|
||||
return True
|
||||
else:
|
||||
print(f"⚠️ {file}: {mdls_ts.isoformat()} != {exiftool_ts} (Δ={delta})")
|
||||
return False
|
||||
|
||||
|
||||
def photos_without_embedded_timestamps(directory):
|
||||
executor = ThreadPoolExecutor(max_workers=cpu_count()//2)
|
||||
try:
|
||||
futures = {
|
||||
executor.submit(photo_has_embedded_timestamp, file): file
|
||||
for file in directory.iterdir()
|
||||
if file.is_file()
|
||||
if file.suffix.lower() not in {".aae"}
|
||||
if not file.name.startswith('.')
|
||||
}
|
||||
|
||||
for future in as_completed(futures):
|
||||
file = futures[future]
|
||||
has_ts = future.result() # raises immediately on first failed future
|
||||
|
||||
if has_ts:
|
||||
file.rename(file.parent / 'ok' / file.name)
|
||||
else:
|
||||
yield file
|
||||
|
||||
except Exception:
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
else:
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
|
||||
def exiftool_write(file, assignments):
|
||||
print(f"🔵 {file}: Writing -- {assignments}")
|
||||
return run((
|
||||
"exiftool", "-overwrite_original",
|
||||
"-api", "QuickTimeUTC=1",
|
||||
*[
|
||||
f"-{group}:{tag}={value}"
|
||||
for group, tag, value in assignments
|
||||
],
|
||||
str(file),
|
||||
))
|
||||
|
||||
|
||||
def add_missing_timestamp(file):
|
||||
data = exiftool_data(file)
|
||||
mdls_ts = mdls_timestamp(file)
|
||||
|
||||
offset = mdls_ts.strftime("%z")
|
||||
offset = f"{offset[:3]}:{offset[3:]}" if len(offset) == 5 else offset
|
||||
|
||||
exif_ts = mdls_ts.strftime("%Y:%m:%d %H:%M:%S")
|
||||
qt_ts = mdls_ts.strftime("%Y:%m:%d %H:%M:%S")
|
||||
qt_ts_tz = f"{qt_ts}{offset}"
|
||||
ext = file.suffix.lower()
|
||||
|
||||
try:
|
||||
if ext in {".heic"}:
|
||||
exiftool_write(file, [
|
||||
("ExifIFD", "DateTimeOriginal", qt_ts),
|
||||
("ExifIFD", "CreateDate", qt_ts),
|
||||
("ExifIFD", "OffsetTime", offset),
|
||||
("ExifIFD", "OffsetTimeOriginal", offset),
|
||||
("ExifIFD", "OffsetTimeDigitized", offset),
|
||||
("QuickTime", "CreateDate", qt_ts_tz),
|
||||
("Keys", "CreationDate", qt_ts_tz),
|
||||
("XMP-xmp", "CreateDate", qt_ts_tz),
|
||||
])
|
||||
elif "QuickTime" in data or ext in {".mp4", ".mov", ".heic", ".cr3"}:
|
||||
exiftool_write(file, [
|
||||
("QuickTime", "CreateDate", qt_ts_tz),
|
||||
("Keys", "CreationDate", qt_ts_tz),
|
||||
])
|
||||
elif "ExifIFD" in data or ext in {".jpg", ".jpeg", ".cr2", ".webp"}:
|
||||
exiftool_write(file, [
|
||||
("ExifIFD", "DateTimeOriginal", exif_ts),
|
||||
("ExifIFD", "CreateDate", exif_ts),
|
||||
("IFD0", "ModifyDate", exif_ts),
|
||||
("ExifIFD", "OffsetTime", offset),
|
||||
("ExifIFD", "OffsetTimeOriginal", offset),
|
||||
("ExifIFD", "OffsetTimeDigitized", offset),
|
||||
])
|
||||
elif ext in {".png", ".gif", ".avif"}:
|
||||
exiftool_write(file, [
|
||||
("XMP-xmp", "CreateDate", qt_ts_tz),
|
||||
("XMP-photoshop", "DateCreated", exif_ts),
|
||||
])
|
||||
else:
|
||||
print(f"❌ {file}: unsupported type, skipped")
|
||||
return
|
||||
|
||||
if photo_has_embedded_timestamp(file):
|
||||
print(f"✅ {file}: Timestamp successfully added: {mdls_ts.isoformat()}")
|
||||
file.rename(file.parent / 'processed' / file.name)
|
||||
return
|
||||
else:
|
||||
category, key, exiftool_ts = exiftool_timestamp(file)
|
||||
print(f"❌ {file}: Timestamp still wrong/missing after write '{category}:{key}:{exiftool_ts}': #{json.dumps(data, indent=4)}")
|
||||
return
|
||||
except CalledProcessError as e:
|
||||
print(f"❌ {file}: Failed to write timestamp: {e}")
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ArgumentParser(description="Print timestamps of photos in the current directory.")
|
||||
parser.add_argument("-d", "--directory", help="Directory to scan for photos")
|
||||
args = parser.parse_args()
|
||||
|
||||
directory = Path(args.directory)
|
||||
(directory/'ok').mkdir(exist_ok=True)
|
||||
(directory/'processed').mkdir(exist_ok=True)
|
||||
|
||||
_photos_without_embedded_timestamps = list(photos_without_embedded_timestamps(directory))
|
||||
print(f"{len(_photos_without_embedded_timestamps)} photos without embedded timestamps found.")
|
||||
print("Press Enter to add missing timestamps...")
|
||||
input()
|
||||
|
||||
for file in _photos_without_embedded_timestamps:
|
||||
add_missing_timestamp(file)
|
||||
Loading…
Reference in a new issue