feat: Refactor get_invalid_folders
to work with a queryset instead of a list of links
This commit is contained in:
parent
dae606de6e
commit
404f333e17
3 changed files with 50 additions and 45 deletions
|
@ -54,7 +54,7 @@ def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional
|
|||
|
||||
oneshot(
|
||||
url=stdin_url or url,
|
||||
out_dir=str(Path(command.out_dir).absolute()),
|
||||
out_dir=str(Path(command.out_dir).resolve()),
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import re
|
|||
import os
|
||||
import shutil
|
||||
import json as pyjson
|
||||
from pathlib import Path
|
||||
|
||||
from itertools import chain
|
||||
from typing import List, Tuple, Dict, Optional, Iterable
|
||||
|
@ -418,28 +419,32 @@ def get_valid_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link
|
|||
for link in filter(is_valid, links)
|
||||
}
|
||||
|
||||
def get_invalid_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_invalid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
|
||||
duplicate = get_duplicate_folders(links, out_dir=OUTPUT_DIR)
|
||||
orphaned = get_orphaned_folders(links, out_dir=OUTPUT_DIR)
|
||||
corrupted = get_corrupted_folders(links, out_dir=OUTPUT_DIR)
|
||||
unrecognized = get_unrecognized_folders(links, out_dir=OUTPUT_DIR)
|
||||
duplicate = get_duplicate_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
orphaned = get_orphaned_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
corrupted = get_corrupted_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
unrecognized = get_unrecognized_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
return {**duplicate, **orphaned, **corrupted, **unrecognized}
|
||||
|
||||
|
||||
def get_duplicate_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_duplicate_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that conflict with other directories that have the same link URL or timestamp"""
|
||||
links = list(links)
|
||||
by_url = {link.url: 0 for link in links}
|
||||
by_timestamp = {link.timestamp: 0 for link in links}
|
||||
by_url = {}
|
||||
by_timestamp = {}
|
||||
indexed_folders = set()
|
||||
for snapshot in snapshots.iterator():
|
||||
link = snapshot.as_link()
|
||||
by_url[link.url] = 0
|
||||
by_timestamp[link.timestamp] = 0
|
||||
indexed_folders.update([link.link_dir])
|
||||
|
||||
duplicate_folders = {}
|
||||
|
||||
indexed_folders = {link.link_dir for link in links}
|
||||
data_folders = (
|
||||
entry.path
|
||||
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME))
|
||||
if entry.is_dir(follow_symlinks=True) and entry.path not in indexed_folders
|
||||
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir()
|
||||
if entry.is_dir() and str(entry) not in indexed_folders
|
||||
)
|
||||
|
||||
for path in chain(sorted(indexed_folders), sorted(data_folders)):
|
||||
|
@ -462,71 +467,70 @@ def get_duplicate_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[
|
|||
|
||||
return duplicate_folders
|
||||
|
||||
def get_orphaned_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_orphaned_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that contain a valid index but aren't listed in the main index"""
|
||||
links = list(links)
|
||||
indexed_folders = {link.link_dir: link for link in links}
|
||||
orphaned_folders = {}
|
||||
|
||||
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
|
||||
if entry.is_dir(follow_symlinks=True):
|
||||
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir():
|
||||
if entry.is_dir():
|
||||
link = None
|
||||
try:
|
||||
link = parse_json_link_details(entry.path)
|
||||
link = parse_json_link_details(str(entry))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if link and entry.path not in indexed_folders:
|
||||
if link and not snapshots.filter(timestamp=entry.name).exists():
|
||||
# folder is a valid link data dir with index details, but it's not in the main index
|
||||
orphaned_folders[entry.path] = link
|
||||
orphaned_folders[str(entry)] = link
|
||||
|
||||
return orphaned_folders
|
||||
|
||||
def get_corrupted_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_corrupted_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that don't contain a valid index and aren't listed in the main index"""
|
||||
return {
|
||||
link.link_dir: link
|
||||
for link in filter(is_corrupt, links)
|
||||
}
|
||||
corrupted = {}
|
||||
for snapshot in snapshots.iterator():
|
||||
link = snapshot.as_link()
|
||||
if is_corrupt(link):
|
||||
corrupted[link.link_dir] = link
|
||||
return corrupted
|
||||
|
||||
def get_unrecognized_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_unrecognized_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that don't contain recognizable archive data and aren't listed in the main index"""
|
||||
by_timestamp = {link.timestamp: 0 for link in links}
|
||||
unrecognized_folders: Dict[str, Optional[Link]] = {}
|
||||
|
||||
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
|
||||
if entry.is_dir(follow_symlinks=True):
|
||||
index_exists = os.path.exists(os.path.join(entry.path, 'index.json'))
|
||||
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir():
|
||||
if entry.is_dir():
|
||||
index_exists = (entry / "index.json").exists()
|
||||
link = None
|
||||
try:
|
||||
link = parse_json_link_details(entry.path)
|
||||
link = parse_json_link_details(str(entry))
|
||||
except KeyError:
|
||||
# Try to fix index
|
||||
if index_exists:
|
||||
try:
|
||||
# Last attempt to repair the detail index
|
||||
link_guessed = parse_json_link_details(entry.path, guess=True)
|
||||
write_json_link_details(link_guessed, out_dir=entry.path)
|
||||
link = parse_json_link_details(entry.path)
|
||||
link_guessed = parse_json_link_details(str(entry), guess=True)
|
||||
write_json_link_details(link_guessed, out_dir=str(entry))
|
||||
link = parse_json_link_details(str(entry))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if index_exists and link is None:
|
||||
# index exists but it's corrupted or unparseable
|
||||
unrecognized_folders[entry.path] = link
|
||||
unrecognized_folders[str(entry)] = link
|
||||
|
||||
elif not index_exists:
|
||||
# link details index doesn't exist and the folder isn't in the main index
|
||||
timestamp = entry.path.rsplit('/', 1)[-1]
|
||||
if timestamp not in by_timestamp:
|
||||
unrecognized_folders[entry.path] = link
|
||||
timestamp = entry.name
|
||||
if not snapshots.filter(timestamp=timestamp).exists():
|
||||
unrecognized_folders[str(entry)] = link
|
||||
|
||||
return unrecognized_folders
|
||||
|
||||
|
||||
def is_valid(link: Link) -> bool:
|
||||
dir_exists = os.path.exists(link.link_dir)
|
||||
index_exists = os.path.exists(os.path.join(link.link_dir, 'index.json'))
|
||||
dir_exists = Path(link.link_dir).exists()
|
||||
index_exists = (Path(link.link_dir) / "index.json").exists()
|
||||
if not dir_exists:
|
||||
# unarchived links are not included in the valid list
|
||||
return False
|
||||
|
@ -541,7 +545,7 @@ def is_valid(link: Link) -> bool:
|
|||
return False
|
||||
|
||||
def is_corrupt(link: Link) -> bool:
|
||||
if not os.path.exists(link.link_dir):
|
||||
if not Path(link.link_dir).exists():
|
||||
# unarchived links are not considered corrupt
|
||||
return False
|
||||
|
||||
|
|
|
@ -352,9 +352,10 @@ def init(force: bool=False, out_dir: str=OUTPUT_DIR) -> None:
|
|||
print(' {lightyellow}√ Added {} orphaned links from existing archive directories.{reset}'.format(len(orphaned_data_dir_links), **ANSI))
|
||||
|
||||
# Links in invalid/duplicate data dirs
|
||||
invalid_folders: Dict[str, Link] = {}
|
||||
for link in all_links:
|
||||
invalid_folders.update(get_invalid_folders([link.as_link()], out_dir=out_dir).items())
|
||||
invalid_folders = {
|
||||
folder: link
|
||||
for folder, link in get_invalid_folders(all_links, out_dir=out_dir).items()
|
||||
}
|
||||
if invalid_folders:
|
||||
print(' {lightyellow}! Skipped adding {} invalid link data directories.{reset}'.format(len(invalid_folders), **ANSI))
|
||||
print(' X ' + '\n X '.join(f'{folder} {link}' for folder, link in invalid_folders.items()))
|
||||
|
|
Loading…
Reference in a new issue