209 lines
No EOL
6.7 KiB
Python
209 lines
No EOL
6.7 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
ALLOWED_EXTS = {
|
|
".png", ".jpg", ".jpeg", ".heic", ".cr2", ".cr3", ".mp4", ".mov",
|
|
".webp", ".avif", ".gif",
|
|
}
|
|
|
|
DATETIME_KEYS = [
|
|
("Composite", "SubSecDateTimeOriginal"),
|
|
("Composite", "SubSecCreateDate"),
|
|
("ExifIFD", "DateTimeOriginal"),
|
|
("ExifIFD", "CreateDate"),
|
|
("XMP-xmp", "CreateDate"),
|
|
("Keys", "CreationDate"),
|
|
("QuickTime", "CreateDate"),
|
|
("XMP-photoshop", "DateCreated"),
|
|
]
|
|
|
|
|
|
def run(command: list[str], check: bool = True) -> subprocess.CompletedProcess:
|
|
return subprocess.run(command, text=True, capture_output=True, check=check)
|
|
|
|
|
|
def exiftool_data(file: Path) -> dict | None:
|
|
result = run([
|
|
"exiftool",
|
|
"-j",
|
|
"-a",
|
|
"-u",
|
|
"-g1",
|
|
"-time:all",
|
|
"-api", "QuickTimeUTC=1",
|
|
"-d", "%Y-%m-%dT%H:%M:%S%z",
|
|
str(file),
|
|
], check=False)
|
|
if result.returncode != 0:
|
|
return None
|
|
try:
|
|
data = __import__("json").loads(result.stdout)
|
|
return data[0] if data else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def exiftool_timestamp(file: Path) -> datetime | None:
|
|
data = exiftool_data(file)
|
|
if not data:
|
|
return None
|
|
|
|
for category, key in DATETIME_KEYS:
|
|
try:
|
|
value = data[category][key]
|
|
except (KeyError, TypeError):
|
|
continue
|
|
try:
|
|
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
|
|
except ValueError:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
def short_hash(file: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with file.open("rb") as fh:
|
|
for chunk in iter(lambda: fh.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
digest = h.digest()
|
|
b64 = base64.b64encode(digest).decode("ascii")
|
|
return b64[:3].replace("/", "_").replace("+", "-")
|
|
|
|
|
|
def build_destination(dest_root: Path, file: Path, ts: datetime) -> Path:
|
|
ext = file.suffix.lower().lstrip(".")
|
|
year = ts.strftime("%Y")
|
|
month = ts.strftime("%m")
|
|
day = ts.strftime("%d")
|
|
hour = ts.strftime("%H")
|
|
minute = ts.strftime("%M")
|
|
second = ts.strftime("%S")
|
|
hash_part = short_hash(file)
|
|
|
|
raw_subdir = "raw" if ext in {"cr2", "cr3"} else None
|
|
month_dir = dest_root / f"{year}-{month}"
|
|
if raw_subdir:
|
|
month_dir = month_dir / raw_subdir
|
|
|
|
filename = f"{year}{month}{day}-{hour}{minute}{second}_{hash_part}.{ext}"
|
|
return month_dir / filename
|
|
|
|
|
|
def move_unsortable(file: Path, source_root: Path, unsortable_root: Path) -> None:
|
|
relpath = file.relative_to(source_root)
|
|
target_dir = (unsortable_root / relpath).parent
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
shutil.chown(str(target_dir), user="www-data", group="www-data")
|
|
target = target_dir / file.name
|
|
if target.exists():
|
|
return
|
|
shutil.move(str(file), str(target))
|
|
shutil.chown(str(target), user="www-data", group="www-data")
|
|
|
|
|
|
def move_sorted(file: Path, target: Path) -> None:
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.chown(str(target.parent), user="www-data", group="www-data")
|
|
shutil.move(str(file), str(target))
|
|
shutil.chown(str(target), user="www-data", group="www-data")
|
|
|
|
def process_file(file: Path, source_root: Path, dest_root: Path, unsortable_root: Path) -> tuple[Path, str]:
|
|
print(f"PROCESSING: {file}")
|
|
ts = exiftool_timestamp(file)
|
|
|
|
if ts is None:
|
|
print(f"UNSORTABLE: {file}")
|
|
move_unsortable(file, source_root, unsortable_root)
|
|
return file, "unsortable"
|
|
|
|
target = build_destination(dest_root, file, ts)
|
|
print(f"DESTINATION: {target}")
|
|
move_sorted(file, target)
|
|
return file, "sorted"
|
|
|
|
|
|
def scan_nextcloud(rel_source: str, rel_unsortable: str, rel_dest: str) -> None:
|
|
print("SCANNING...")
|
|
# run(["chown", "-R", "www-data:www-data", abs_source_path], check=True)
|
|
# run(["chmod", "-R", "770", abs_source_path], check=True)
|
|
|
|
# run(["chown", "-R", "www-data:www-data", abs_dest_path], check=True)
|
|
# run(["chown", "-R", "www-data:www-data", abs_unsortable_path], check=True)
|
|
# run(["chmod", "-R", "770", abs_dest_path], check=True)
|
|
# run(["chmod", "-R", "770", abs_unsortable_path], check=True)
|
|
|
|
run(["sudo", "-u", "www-data", "php", "/opt/nextcloud/occ", "files:scan", "--path", rel_source], check=True)
|
|
run(["sudo", "-u", "www-data", "php", "/opt/nextcloud/occ", "files:scan", "--path", rel_unsortable], check=True)
|
|
run(["sudo", "-u", "www-data", "php", "/opt/nextcloud/occ", "files:scan", "--path", rel_dest], check=True)
|
|
|
|
run(["systemctl", "start", "nextcloud-generate-new-previews.service"], check=True)
|
|
|
|
|
|
def iter_files(source_root: Path):
|
|
for path in source_root.rglob("*"):
|
|
if path.is_file() and path.suffix.lower() in ALLOWED_EXTS:
|
|
yield path
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Sort Nextcloud media files by embedded timestamp."
|
|
)
|
|
parser.add_argument("nc_user")
|
|
parser.add_argument("source_subdir")
|
|
parser.add_argument("dest_subdir")
|
|
parser.add_argument("unsortable_subdir")
|
|
parser.add_argument("--workers", type=int, default=os.cpu_count() or 1)
|
|
args = parser.parse_args()
|
|
|
|
nc_user = args.nc_user
|
|
source_subdir = args.source_subdir
|
|
dest_subdir = args.dest_subdir
|
|
unsortable_subdir = args.unsortable_subdir
|
|
|
|
rel_source_path = f"/{nc_user}/files/{source_subdir}"
|
|
abs_source_path = f"/var/lib/nextcloud/{nc_user}/files/{source_subdir}"
|
|
|
|
rel_dest_path = f"/{nc_user}/files/{dest_subdir}"
|
|
abs_dest_path = f"/var/lib/nextcloud/{nc_user}/files/{dest_subdir}"
|
|
|
|
rel_unsortable_path = f"/{nc_user}/files/{unsortable_subdir}"
|
|
abs_unsortable_path = f"/var/lib/nextcloud/{nc_user}/files/{unsortable_subdir}"
|
|
|
|
source_root = Path(abs_source_path)
|
|
dest_root = Path(abs_dest_path)
|
|
unsortable_root = Path(abs_unsortable_path)
|
|
|
|
print("STARTING...")
|
|
|
|
run(["chown", "-R", "www-data:www-data", str(source_root)], check=True)
|
|
run(["chmod", "-R", "770", str(source_root)], check=True)
|
|
|
|
files = list(iter_files(source_root))
|
|
|
|
if not files:
|
|
print("NO MATCHING FILES FOUND.")
|
|
print("FINISH.")
|
|
raise SystemExit(0)
|
|
|
|
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as executor:
|
|
futures = {
|
|
executor.submit(process_file, file, source_root, dest_root, unsortable_root): file
|
|
for file in files
|
|
}
|
|
for future in as_completed(futures):
|
|
future.result()
|
|
|
|
scan_nextcloud(rel_source_path, rel_unsortable_path, rel_dest_path)
|
|
|
|
print("FINISH.") |