#!/usr/bin/env python3 import argparse import base64 import ctypes import json import os import resource import sys import struct import time import zipfile import zlib from zlib import crc32 from typing import Any, BinaryIO, Dict, Generator, List, Optional, Tuple compressed_extensions: Dict[int, str] = { zipfile.ZIP_STORED: "", zipfile.ZIP_DEFLATED: ".zlib", } if sys.platform != "linux" and struct.calcsize("P") != 8: # TODO: test cross-platform raise RuntimeError("Only 64-bit linux is supported") # int fallocate(int fd, int mode, off_t offset, off_t len) libc = ctypes.cdll.LoadLibrary("libc.so.6") fallocate = libc.fallocate fallocate.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_longlong, ctypes.c_longlong) FALLOC_FL_KEEP_SIZE = 1 FALLOC_FL_PUNCH_HOLE = 2 def check_zipinfo(zipinfo: zipfile.ZipInfo) -> None: """ Checks that zipinfo is compatible with this script. Raises ValueError on first error. """ # Check info flags if zipinfo.flag_bits & (1 << 0): raise ValueError("file is encrypted") if zipinfo.flag_bits & (1 << 5): raise ValueError("file has compressed patched data (flag bit 5)") if zipinfo.flag_bits & (1 << 6): raise ValueError("file has strong encryption (flag bit 6)") # Check compression if zipinfo.compress_type not in (zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED): raise ValueError(f"file has unsupported compression type {zipinfo.compress_type}") def check_file_header(fp: BinaryIO) -> Tuple[bytes, int]: """ Tries to read the file header inside the zip archive and returns the offset of the beginning of the file (fp must be seekable). """ raw_header = fp.read(zipfile.sizeFileHeader) if len(raw_header) != zipfile.sizeFileHeader: raise zipfile.BadZipFile("Truncated file header") fheader = struct.unpack(zipfile.structFileHeader, raw_header) if fheader[zipfile._FH_SIGNATURE] != zipfile.stringFileHeader: raise zipfile.BadZipFile("Bad magic number for file header") raw_header += fp.read(fheader[zipfile._FH_FILENAME_LENGTH]) if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]: raw_header += fp.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH]) return raw_header, fp.tell() def analyze_zipinfo( fp: BinaryIO, zipinfo: zipfile.ZipInfo, *, calc_actual_crc: bool = False, ) -> Dict[str, Any]: """ Calculates and returns some meta information that can be stored to json file and can be used to extract this file from archive. """ check_zipinfo(zipinfo) # Check file header fp.seek(zipinfo.header_offset, 0) raw_header, file_offset = check_file_header(fp) info = { "filename": zipinfo.filename, "crc32": zipinfo.CRC, "offset": file_offset, "compressed_size": zipinfo.compress_size, "uncompressed_size": zipinfo.file_size, "compress_type": zipinfo.compress_type, "mtime": time.mktime(zipinfo.date_time + (0, 0, -1)), "raw_header": base64.b64encode(raw_header).decode("ascii"), } if calc_actual_crc: running_crc = crc32(b"") for chunk in decompressing_reader(fp, file_offset, zipinfo.compress_size, zipinfo.compress_type): running_crc = crc32(chunk, running_crc) info["actual_crc32"] = running_crc return info def analyze_zip( fp: BinaryIO, z: zipfile.ZipFile, *, check_crc: bool = False, block_size: int = 4096, ) -> Optional[List[Dict[str, Any]]]: """ Calculates and returns some meta information that can be stored to json file and can be used to extract files from archive. Additionally, calculates and prints some statistic about file sizes and potential memory usage. """ result: List[Dict[str, Any]] = [] # Some statistic maxrss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024 compressed_files_size = 0 uncompressed_files_size = 0 largest_compressed_file_size = 0 largest_uncompressed_file_size = 0 sparsable_size = 0 crc_errors = 0 other_errors = 0 last_print_tm = 0.0 filelist = [x for x in z.infolist() if not x.is_dir()] filecount = len(filelist) for i, zipinfo in enumerate(filelist): # Print progress information if sys.stderr.isatty(): tm = time.time() if i == filecount - 1 or tm - last_print_tm > 0.2: print(f"\r\033[K [{i + 1}/{filecount}] Analyzing {zipinfo.filename}", end="\r", file=sys.stderr, flush=True) last_print_tm = tm try: info = analyze_zipinfo(fp, zipinfo, calc_actual_crc=check_crc) except Exception as exc: print(f"ERROR: {zipinfo.filename}: {exc}") other_errors += 1 continue if check_crc: actual_crc32 = info.pop("actual_crc32") if actual_crc32 != zipinfo.CRC: print(f"ERROR: {zipinfo.filename}: CRC32 mismatch (expected {zipinfo.CRC}, got {actual_crc32})") crc_errors += 1 # Calculate size that can be sparsed sp_pos1, sp_pos2 = calc_sparsable_area(info["offset"], zipinfo.compress_size) sparsable_size += sp_pos2 - sp_pos1 # Calculate size stat compressed_files_size += zipinfo.compress_size if zipinfo.compress_size > largest_compressed_file_size: largest_compressed_file_size += zipinfo.compress_size uncompressed_files_size += zipinfo.file_size if zipinfo.file_size > largest_uncompressed_file_size: largest_uncompressed_file_size += zipinfo.file_size result.append(info) if sys.stderr.isatty(): print("\r\033[K", end="", file=sys.stderr) peak_ram = int(maxrss + largest_compressed_file_size + largest_uncompressed_file_size) print(f"Total compressed files size: {human_size(compressed_files_size)} ({compressed_files_size} bytes)") print(f"Total uncompressed files size: {human_size(uncompressed_files_size)} ({uncompressed_files_size} bytes)") print(f"Largest compressed file: {human_size(largest_compressed_file_size)} ({largest_compressed_file_size} bytes)") print(f"Largest uncompressed file: {human_size(largest_uncompressed_file_size)} ({largest_uncompressed_file_size} bytes)") print(f"Size that can be sparsed: {human_size(sparsable_size)} ({sparsable_size} bytes)") print("") print(f"You probably need {human_size(peak_ram)} RAM and {human_size(uncompressed_files_size - sparsable_size + largest_uncompressed_file_size)} free disk space") if crc_errors or other_errors: print("Some errors found") return None return result def extract_file_from_zip( zfp: BinaryIO, info: Dict[str, Any], output_dir: str, *, decompress: bool = False, check_crc: bool = True, create_directory: bool = True, ) -> str: """ Extracts file from zip archive using meta information generated by the analyze_zipinfo function. It does not modify the original zip file. Returns relative path to the created file. """ # TODO: check that file hasn't escaped out of output_dir file_output_path = os.path.join(output_dir, info["filename"]) if not decompress: file_output_path += compressed_extensions.get(info["compress_type"], "") if create_directory: file_output_dir = os.path.dirname(file_output_path) if not os.path.isdir(file_output_dir): os.makedirs(file_output_dir) # Extract try: with open(file_output_path, "wb") as wfp: if decompress or info["compress_type"] == zipfile.ZIP_STORED: running_crc = crc32(b"") for chunk in decompressing_reader(zfp, info["offset"], info["compressed_size"], compress_type=info["compress_type"]): wfp.write(chunk) running_crc = crc32(chunk, running_crc) if check_crc and running_crc != info["crc32"]: raise ValueError(f"CRC32 mismatch (expected {info['crc32']}, got {running_crc})") else: # We can't check crc of compressed data... for chunk in raw_reader(zfp, info["offset"], info["compressed_size"]): wfp.write(chunk) except BaseException: # Don't keep broken files on disk if os.path.exists(file_output_path): os.remove(file_output_path) raise if "mtime" in info: os.utime(file_output_path, (info["mtime"], info["mtime"])) return file_output_path def sparse_file( fileno: int, pos_start: int, pos_end: int, ) -> bool: if pos_end <= pos_start: # nothing to sparse return False # fallocate_args = [ # "fallocate", # "--punch-hole", # "--offset", # str(pos_start), # "--length", # str(pos_end - pos_start), # "--", # archive_path, # ] # subprocess.run(fallocate_args, shell=False, check=True) rc = fallocate( fileno, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, pos_start, pos_end - pos_start, ) if rc != 0: raise RuntimeError("fallocate failed") return True def restore_file_to_zip( zfp: BinaryIO, info: Dict[str, Any], output_dir: str, ) -> None: """ Copies the file that was extracted (but not decompressed) from a zip file back to the zip file. The offset is taken from the meta information generated by the analyze_zipinfo function. """ # TODO: check that file hasn't escaped out of output_dir file_output_path = os.path.join(output_dir, info["filename"]) file_output_path += compressed_extensions.get(info["compress_type"], "") if os.stat(file_output_path).st_size != info["compressed_size"]: raise ValueError("Size mismatch") with open(file_output_path, "rb") as fp: if info.get("raw_header"): raw_header = base64.b64decode(info["raw_header"]) zfp.seek(info["offset"] - len(raw_header), 0) zfp.write(raw_header) else: zfp.seek(info["offset"], 0) while True: chunk = fp.read(65536) if not chunk: break zfp.write(chunk) # utils def raw_reader(fp: BinaryIO, offset: int, size: int) -> Generator[bytes, None, None]: fp.seek(offset, 0) remaining = size while remaining > 0: chunk = fp.read(min(65536, remaining)) if not chunk: break remaining -= len(chunk) yield chunk def decompressing_reader(fp: BinaryIO, offset: int, size: int, compress_type: int) -> Generator[bytes, None, None]: if compress_type == zipfile.ZIP_STORED: for chunk in raw_reader(fp, offset, size): yield chunk return if compress_type == zipfile.ZIP_DEFLATED: # TODO: optimize memory usage by using decompressobj data = b"".join(raw_reader(fp, offset, size)) yield zlib.decompress(data, -15) return raise ValueError(f"Unsupported compression type {compress_type!r}") def calc_sparsable_area(offset: int, size: int, block_size: int = 4096) -> Tuple[int, int]: """ Aligns the positions to block size and returns the start and end positions of the chunk that can be sparsed. If the chunk is too small or cannot be aligned, returns two zeroes. """ pos1 = (offset // block_size) * block_size if offset % block_size != 0: pos1 += block_size pos2 = ((offset + size) // block_size) * block_size if pos2 <= pos1: return 0, 0 return pos1, pos2 def human_size(s: int) -> str: if s == 1: return "1 byte" if s < 1000: return f"{s} bytes" if s < 1000000: return f"{s / 1024.0:.1f} KiB" if s < 1000000000: return f"{s / 1024.0 / 1024.0:.1f} MiB" return f"{s / 1024.0 / 1024.0 / 1024.0:.1f} GiB" # entrypoints def main_analyze(args: argparse.Namespace) -> int: with open(args.zipfile, "rb") as fp, zipfile.ZipFile(fp, mode="r", allowZip64=True) as z: print("Analyzing zip...") infolist = analyze_zip(fp, z, check_crc=args.check_crc, block_size=args.block_size) if infolist is None: return 1 archive_info = { "archive_path": args.zipfile, "archive_size": os.stat(args.zipfile).st_size, "files": infolist, } with open(args.metafile, "w", encoding="utf-8") as jfp: json.dump(archive_info, jfp, indent=2, ensure_ascii=False) print() print(f"Meta information saved to {args.metafile!r}") print("Now you can use 'extract' command to extract and destroy the archive") return 0 def main_extract(args: argparse.Namespace) -> int: group_sparse_bytes = int(args.grouped_sparse) * 1024 * 1024 use_sparse: bool = not args.nosparse with open(args.metafile, "r", encoding="utf-8-sig") as jfp: metainfo = dict(json.load(jfp)) archive_path = os.path.join(os.path.dirname(args.metafile), metainfo["archive_path"]) if os.stat(archive_path).st_size != metainfo["archive_size"]: print("Archive size mismatch") return 1 output_dir: str = args.output if os.path.exists(output_dir) and os.listdir(output_dir): print("Output directory is not empty", file=sys.stderr) return 1 infolist: List[Dict[str, Any]] = metainfo["files"] infolist.sort(key=lambda x: x["offset"]) for info in infolist: if info["compress_type"] not in (zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED): print(f"ERROR: {info['filename']}: unsupported compression type {info['compress_type']}") return 1 success_count = 0 failed_count = 0 if use_sparse: zfp = open(archive_path, "r+b") else: zfp = open(archive_path, "rb") with zfp: fileno = zfp.fileno() if use_sparse: print("This command will DESTROY YOUR ZIP ARCHIVE while extracting files.") print("If you interrupt the process, you probably won't recover your files.") print("Type 'yes' to continue.", flush=True) if input("> ").lower().strip() != "yes": print("Aborted.") return 1 else: print("ZIP archive will not be destroyed, everything is safe.", flush=True) print("Making directories...", end=" ", flush=True) for info in infolist: file_output_dir = os.path.dirname(os.path.join(output_dir, info["filename"])) if not os.path.isdir(file_output_dir): os.makedirs(file_output_dir) print("Done.") fallocate_groups: List[Tuple[int, int]] = [] filecount = len(infolist) last_print_tm = 0.0 for i, info in enumerate(infolist): # Print progress information if sys.stderr.isatty(): tm = time.time() if i == filecount - 1 or tm - last_print_tm > 0.2: print(f"\r\033[K [{i + 1}/{filecount}] Extracting {info['filename']}", end="\r", file=sys.stderr, flush=True) last_print_tm = tm try: extract_file_from_zip(zfp, info, output_dir, decompress=args.decompress, create_directory=False) except Exception as exc: print(f"ERROR: {info['filename']}: {exc}") failed_count += 1 fallocate_groups.clear() continue if use_sparse: chunk_to_sparse = calc_sparsable_area(info["offset"], info["compressed_size"], block_size=args.block_size) if sum(chunk_to_sparse) <= 0: # nothing to sparse pass elif group_sparse_bytes <= 0: sparse_file( fileno, pos_start=chunk_to_sparse[0], pos_end=chunk_to_sparse[1], ) else: if fallocate_groups and chunk_to_sparse[0] - fallocate_groups[0][0] >= group_sparse_bytes: sparse_file( fileno, pos_start=fallocate_groups[0][0], pos_end=chunk_to_sparse[0], ) fallocate_groups.clear() fallocate_groups.append(chunk_to_sparse) success_count += 1 if use_sparse and fallocate_groups: sparse_file( fileno, pos_start=fallocate_groups[0][0], pos_end=fallocate_groups[-1][1], ) fallocate_groups.clear() if sys.stderr.isatty(): print("\r\033[K", file=sys.stderr, end="", flush=True) print(f"Unpacked {success_count} files with {failed_count} errors") return 0 if failed_count == 0 else 1 def main_restore(args: argparse.Namespace) -> int: with open(args.metafile, "r", encoding="utf-8-sig") as jfp: metainfo = dict(json.load(jfp)) archive_path = os.path.join(os.path.dirname(args.metafile), metainfo["archive_path"]) if os.stat(archive_path).st_size != metainfo["archive_size"]: print("Archive size mismatch") return 1 failed_count = 0 with open(archive_path, "r+b") as zfp: infolist = metainfo["files"] filecount = len(infolist) last_print_tm = 0.0 for i, info in enumerate(infolist): # Print progress information if sys.stderr.isatty(): tm = time.time() if i == filecount - 1 or tm - last_print_tm > 0.2: print(f"\r\033[K [{i + 1}/{filecount}] Restoring {info['filename']}", end="\r", file=sys.stderr, flush=True) last_print_tm = tm try: restore_file_to_zip(zfp, info, args.output) except Exception as exc: print(f"ERROR: {info['filename']}: {exc}") failed_count += 1 if sys.stderr.isatty(): print("\r\033[K", file=sys.stderr, end="", flush=True) if failed_count > 0: print(f"{failed_count} files failed.") return 1 print("Done.") return 0 def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("-b", "--block-size", type=int, default=4096, help="Filesystem block size in bytes (default: 4096)") subparsers = parser.add_subparsers(dest="command") subparsers.required = True parser_analyze = subparsers.add_parser("analyze", help="Analyze ZIP archive and create information file") parser_analyze.add_argument("-c", "--check-crc", action="store_true", default=False, help="Analyze, check CRC32 and exit") parser_analyze.add_argument("zipfile", metavar="ARCHIVE", help="ZIP archive to pslit and sparse") parser_analyze.add_argument("metafile", metavar="METAFILE", help="JSON output file to store meta information") parser_extract = subparsers.add_parser("extract", help="Extract ZIP archive and destroy it using fallocate") parser_extract.add_argument("--decompress", action="store_true", default=False, help="Decompress extracted files if they are compressed") parser_extract.add_argument("--nosparse", action="store_true", default=False, help="Do not call fallocate and keep the ZIP file untouched") parser_extract.add_argument("--grouped-sparse", type=int, default=0, help="Group fallocate calls to larger chunks with specified size in MiB (will destroy zip file headers!)") parser_extract.add_argument("metafile", metavar="METAFILE", help="JSON output file to store meta information") parser_extract.add_argument("output", metavar="OUTPUTDIR", default=".", help="Output directory") parser_restore = subparsers.add_parser("restore", help="Restore original ZIP archive using extracted compressed files") parser_restore.add_argument("metafile", metavar="METAFILE", help="JSON output file to store meta information") parser_restore.add_argument("output", metavar="OUTPUTDIR", default=".", help="Output directory which contains extracted compressed files") args = parser.parse_args() if args.block_size <= 0: print("Invalid block size") return 1 if args.command == "analyze": return main_analyze(args) if args.command == "extract": return main_extract(args) if args.command == "restore": return main_restore(args) return 0 if __name__ == "__main__": sys.exit(main())