OpenAI Cookbook 06月25日 15:08
No Title
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

该Python工具包提供了一种用于处理文本文件差分(diff)和补丁(patch)的实用程序。它允许用户通过“伪差分”补丁文件来修改文本文件集合,包括添加、删除和更新文件。该工具包的核心是解析和应用补丁文件的能力,确保了文件更改的准确性和可靠性,适用于版本控制和文件同步等多种场景。

💡 该工具包定义了`ActionType`枚举,明确了文件操作类型,包括添加、删除和更新,为文件变更操作提供了清晰的分类。

💾 `FileChange` 数据类详细描述了文件的变更信息,包括变更类型、旧内容、新内容和移动路径,使得对文件修改的追踪更加细致。

📜 `Parser` 类是核心组件,负责解析补丁文件。它能够识别并处理添加、删除和更新文件的操作,确保补丁文件的正确应用。

⚙️ 工具包通过`find_context`函数实现上下文匹配,提高了补丁应用的准确性,即使在原始文件略有改动的情况下,也能找到合适的位置应用补丁。

#!/usr/bin/env python3"""A self-contained **pure-Python 3.9+** utility for applying human-readable“pseudo-diff” patch files to a collection of text files."""from __future__ import annotationsimport pathlibfrom dataclasses import dataclass, fieldfrom enum import Enumfrom typing import (    Callable,    Dict,    List,    Optional,    Tuple,    Union,)# --------------------------------------------------------------------------- ##  Domain objects# --------------------------------------------------------------------------- #class ActionType(str, Enum):    ADD = "add"    DELETE = "delete"    UPDATE = "update"@dataclassclass FileChange:    type: ActionType    old_content: Optional[str] = None    new_content: Optional[str] = None    move_path: Optional[str] = None@dataclassclass Commit:    changes: Dict[str, FileChange] = field(default_factory=dict)# --------------------------------------------------------------------------- ##  Exceptions# --------------------------------------------------------------------------- #class DiffError(ValueError):    """Any problem detected while parsing or applying a patch."""# --------------------------------------------------------------------------- ##  Helper dataclasses used while parsing patches# --------------------------------------------------------------------------- #@dataclassclass Chunk:    orig_index: int = -1    del_lines: List[str] = field(default_factory=list)    ins_lines: List[str] = field(default_factory=list)@dataclassclass PatchAction:    type: ActionType    new_file: Optional[str] = None    chunks: List[Chunk] = field(default_factory=list)    move_path: Optional[str] = None@dataclassclass Patch:    actions: Dict[str, PatchAction] = field(default_factory=dict)# --------------------------------------------------------------------------- ##  Patch text parser# --------------------------------------------------------------------------- #@dataclassclass Parser:    current_files: Dict[str, str]    lines: List[str]    index: int = 0    patch: Patch = field(default_factory=Patch)    fuzz: int = 0    # ------------- low-level helpers -------------------------------------- #    def _cur_line(self) -> str:        if self.index >= len(self.lines):            raise DiffError("Unexpected end of input while parsing patch")        return self.lines[self.index]    @staticmethod    def _norm(line: str) -> str:        """Strip CR so comparisons work for both LF and CRLF input."""        return line.rstrip("\r")    # ------------- scanning convenience ----------------------------------- #    def is_done(self, prefixes: Optional[Tuple[str, ...]] = None) -> bool:        if self.index >= len(self.lines):            return True        if (            prefixes            and len(prefixes) > 0            and self._norm(self._cur_line()).startswith(prefixes)        ):            return True        return False    def startswith(self, prefix: Union[str, Tuple[str, ...]]) -> bool:        return self._norm(self._cur_line()).startswith(prefix)    def read_str(self, prefix: str) -> str:        """        Consume the current line if it starts with *prefix* and return the text        **after** the prefix.  Raises if prefix is empty.        """        if prefix == "":            raise ValueError("read_str() requires a non-empty prefix")        if self._norm(self._cur_line()).startswith(prefix):            text = self._cur_line()[len(prefix) :]            self.index += 1            return text        return ""    def read_line(self) -> str:        """Return the current raw line and advance."""        line = self._cur_line()        self.index += 1        return line    # ------------- public entry point -------------------------------------- #    def parse(self) -> None:        while not self.is_done(("*** End Patch",)):            # ---------- UPDATE ---------- #            path = self.read_str("*** Update File: ")            if path:                if path in self.patch.actions:                    raise DiffError(f"Duplicate update for file: {path}")                move_to = self.read_str("*** Move to: ")                if path not in self.current_files:                    raise DiffError(f"Update File Error - missing file: {path}")                text = self.current_files[path]                action = self._parse_update_file(text)                action.move_path = move_to or None                self.patch.actions[path] = action                continue            # ---------- DELETE ---------- #            path = self.read_str("*** Delete File: ")            if path:                if path in self.patch.actions:                    raise DiffError(f"Duplicate delete for file: {path}")                if path not in self.current_files:                    raise DiffError(f"Delete File Error - missing file: {path}")                self.patch.actions[path] = PatchAction(type=ActionType.DELETE)                continue            # ---------- ADD ---------- #            path = self.read_str("*** Add File: ")            if path:                if path in self.patch.actions:                    raise DiffError(f"Duplicate add for file: {path}")                if path in self.current_files:                    raise DiffError(f"Add File Error - file already exists: {path}")                self.patch.actions[path] = self._parse_add_file()                continue            raise DiffError(f"Unknown line while parsing: {self._cur_line()}")        if not self.startswith("*** End Patch"):            raise DiffError("Missing *** End Patch sentinel")        self.index += 1  # consume sentinel    # ------------- section parsers ---------------------------------------- #    def _parse_update_file(self, text: str) -> PatchAction:        action = PatchAction(type=ActionType.UPDATE)        lines = text.split("\n")        index = 0        while not self.is_done(            (                "*** End Patch",                "*** Update File:",                "*** Delete File:",                "*** Add File:",                "*** End of File",            )        ):            def_str = self.read_str("@@ ")            section_str = ""            if not def_str and self._norm(self._cur_line()) == "@@":                section_str = self.read_line()            if not (def_str or section_str or index == 0):                raise DiffError(f"Invalid line in update section:\n{self._cur_line()}")            if def_str.strip():                found = False                if def_str not in lines[:index]:                    for i, s in enumerate(lines[index:], index):                        if s == def_str:                            index = i + 1                            found = True                            break                if not found and def_str.strip() not in [                    s.strip() for s in lines[:index]                ]:                    for i, s in enumerate(lines[index:], index):                        if s.strip() == def_str.strip():                            index = i + 1                            self.fuzz += 1                            found = True                            break            next_ctx, chunks, end_idx, eof = peek_next_section(self.lines, self.index)            new_index, fuzz = find_context(lines, next_ctx, index, eof)            if new_index == -1:                ctx_txt = "\n".join(next_ctx)                raise DiffError(                    f"Invalid {'EOF ' if eof else ''}context at {index}:\n{ctx_txt}"                )            self.fuzz += fuzz            for ch in chunks:                ch.orig_index += new_index                action.chunks.append(ch)            index = new_index + len(next_ctx)            self.index = end_idx        return action    def _parse_add_file(self) -> PatchAction:        lines: List[str] = []        while not self.is_done(            ("*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:")        ):            s = self.read_line()            if not s.startswith("+"):                raise DiffError(f"Invalid Add File line (missing '+'): {s}")            lines.append(s[1:])  # strip leading '+'        return PatchAction(type=ActionType.ADD, new_file="\n".join(lines))# --------------------------------------------------------------------------- ##  Helper functions# --------------------------------------------------------------------------- #def find_context_core(    lines: List[str], context: List[str], start: int) -> Tuple[int, int]:    if not context:        return start, 0    for i in range(start, len(lines)):        if lines[i : i + len(context)] == context:            return i, 0    for i in range(start, len(lines)):        if [s.rstrip() for s in lines[i : i + len(context)]] == [            s.rstrip() for s in context        ]:            return i, 1    for i in range(start, len(lines)):        if [s.strip() for s in lines[i : i + len(context)]] == [            s.strip() for s in context        ]:            return i, 100    return -1, 0def find_context(    lines: List[str], context: List[str], start: int, eof: bool) -> Tuple[int, int]:    if eof:        new_index, fuzz = find_context_core(lines, context, len(lines) - len(context))        if new_index != -1:            return new_index, fuzz        new_index, fuzz = find_context_core(lines, context, start)        return new_index, fuzz + 10_000    return find_context_core(lines, context, start)def peek_next_section(    lines: List[str], index: int) -> Tuple[List[str], List[Chunk], int, bool]:    old: List[str] = []    del_lines: List[str] = []    ins_lines: List[str] = []    chunks: List[Chunk] = []    mode = "keep"    orig_index = index    while index < len(lines):        s = lines[index]        if s.startswith(            (                "@@",                "*** End Patch",                "*** Update File:",                "*** Delete File:",                "*** Add File:",                "*** End of File",            )        ):            break        if s == "***":            break        if s.startswith("***"):            raise DiffError(f"Invalid Line: {s}")        index += 1        last_mode = mode        if s == "":            s = " "        if s[0] == "+":            mode = "add"        elif s[0] == "-":            mode = "delete"        elif s[0] == " ":            mode = "keep"        else:            raise DiffError(f"Invalid Line: {s}")        s = s[1:]        if mode == "keep" and last_mode != mode:            if ins_lines or del_lines:                chunks.append(                    Chunk(                        orig_index=len(old) - len(del_lines),                        del_lines=del_lines,                        ins_lines=ins_lines,                    )                )            del_lines, ins_lines = [], []        if mode == "delete":            del_lines.append(s)            old.append(s)        elif mode == "add":            ins_lines.append(s)        elif mode == "keep":            old.append(s)    if ins_lines or del_lines:        chunks.append(            Chunk(                orig_index=len(old) - len(del_lines),                del_lines=del_lines,                ins_lines=ins_lines,            )        )    if index < len(lines) and lines[index] == "*** End of File":        index += 1        return old, chunks, index, True    if index == orig_index:        raise DiffError("Nothing in this section")    return old, chunks, index, False# --------------------------------------------------------------------------- ##  Patch → Commit and Commit application# --------------------------------------------------------------------------- #def _get_updated_file(text: str, action: PatchAction, path: str) -> str:    if action.type is not ActionType.UPDATE:        raise DiffError("_get_updated_file called with non-update action")    orig_lines = text.split("\n")    dest_lines: List[str] = []    orig_index = 0    for chunk in action.chunks:        if chunk.orig_index > len(orig_lines):            raise DiffError(                f"{path}: chunk.orig_index {chunk.orig_index} exceeds file length"            )        if orig_index > chunk.orig_index:            raise DiffError(                f"{path}: overlapping chunks at {orig_index} > {chunk.orig_index}"            )        dest_lines.extend(orig_lines[orig_index : chunk.orig_index])        orig_index = chunk.orig_index        dest_lines.extend(chunk.ins_lines)        orig_index += len(chunk.del_lines)    dest_lines.extend(orig_lines[orig_index:])    return "\n".join(dest_lines)def patch_to_commit(patch: Patch, orig: Dict[str, str]) -> Commit:    commit = Commit()    for path, action in patch.actions.items():        if action.type is ActionType.DELETE:            commit.changes[path] = FileChange(                type=ActionType.DELETE, old_content=orig[path]            )        elif action.type is ActionType.ADD:            if action.new_file is None:                raise DiffError("ADD action without file content")            commit.changes[path] = FileChange(                type=ActionType.ADD, new_content=action.new_file            )        elif action.type is ActionType.UPDATE:            new_content = _get_updated_file(orig[path], action, path)            commit.changes[path] = FileChange(                type=ActionType.UPDATE,                old_content=orig[path],                new_content=new_content,                move_path=action.move_path,            )    return commit# --------------------------------------------------------------------------- ##  User-facing helpers# --------------------------------------------------------------------------- #def text_to_patch(text: str, orig: Dict[str, str]) -> Tuple[Patch, int]:    lines = text.splitlines()  # preserves blank lines, no strip()    if (        len(lines) < 2        or not Parser._norm(lines[0]).startswith("*** Begin Patch")        or Parser._norm(lines[-1]) != "*** End Patch"    ):        raise DiffError("Invalid patch text - missing sentinels")    parser = Parser(current_files=orig, lines=lines, index=1)    parser.parse()    return parser.patch, parser.fuzzdef identify_files_needed(text: str) -> List[str]:    lines = text.splitlines()    return [        line[len("*** Update File: ") :]        for line in lines        if line.startswith("*** Update File: ")    ] + [        line[len("*** Delete File: ") :]        for line in lines        if line.startswith("*** Delete File: ")    ]def identify_files_added(text: str) -> List[str]:    lines = text.splitlines()    return [        line[len("*** Add File: ") :]        for line in lines        if line.startswith("*** Add File: ")    ]# --------------------------------------------------------------------------- ##  File-system helpers# --------------------------------------------------------------------------- #def load_files(paths: List[str], open_fn: Callable[[str], str]) -> Dict[str, str]:    return {path: open_fn(path) for path in paths}def apply_commit(    commit: Commit,    write_fn: Callable[[str, str], None],    remove_fn: Callable[[str], None],) -> None:    for path, change in commit.changes.items():        if change.type is ActionType.DELETE:            remove_fn(path)        elif change.type is ActionType.ADD:            if change.new_content is None:                raise DiffError(f"ADD change for {path} has no content")            write_fn(path, change.new_content)        elif change.type is ActionType.UPDATE:            if change.new_content is None:                raise DiffError(f"UPDATE change for {path} has no new content")            target = change.move_path or path            write_fn(target, change.new_content)            if change.move_path:                remove_fn(path)def process_patch(    text: str,    open_fn: Callable[[str], str],    write_fn: Callable[[str, str], None],    remove_fn: Callable[[str], None],) -> str:    if not text.startswith("*** Begin Patch"):        raise DiffError("Patch text must start with *** Begin Patch")    paths = identify_files_needed(text)    orig = load_files(paths, open_fn)    patch, _fuzz = text_to_patch(text, orig)    commit = patch_to_commit(patch, orig)    apply_commit(commit, write_fn, remove_fn)    return "Done!"# --------------------------------------------------------------------------- ##  Default FS helpers# --------------------------------------------------------------------------- #def open_file(path: str) -> str:    with open(path, "rt", encoding="utf-8") as fh:        return fh.read()def write_file(path: str, content: str) -> None:    target = pathlib.Path(path)    target.parent.mkdir(parents=True, exist_ok=True)    with target.open("wt", encoding="utf-8") as fh:        fh.write(content)def remove_file(path: str) -> None:    pathlib.Path(path).unlink(missing_ok=True)# --------------------------------------------------------------------------- ##  CLI entry-point# --------------------------------------------------------------------------- #def main() -> None:    import sys    patch_text = sys.stdin.read()    if not patch_text:        print("Please pass patch text through stdin", file=sys.stderr)        return    try:        result = process_patch(patch_text, open_file, write_file, remove_file)    except DiffError as exc:        print(exc, file=sys.stderr)        return    print(result)if __name__ == "__main__":    main()

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Python 文本处理 差分 补丁
相关文章