Python 读取文本文件的最佳实践(3 种方式:小文件 / 大文件 / 最快)
Python 读取文本文件的最佳实践(3 种方式:小文件 / 大文件 / 最快)
场景痛点:读文本要么卡内存、要么速度慢、还老是中文乱码。 最佳方案一句话:小文件直接读,大文件流式读,追求极致用 mmap,并把编码与换行一次性处理好。 目录速览: A. 小文件最快写法 · B. 大文件稳妥写法 · C. 追求极致的最快写法mmap · Bonus 智能读取器 · FAQ · 性能自测模板 · 延伸阅读 · 更新记录
A. 小文件最快写法
适用:文本体积 ≤ 50 MB(经验阈值,可按需调整)。 特点:一行搞定,读进内存,编码与换行安全。
方法 A1:Path.read_text()(最短可运行)
from pathlib import Path
text = Path("data.txt").read_text(encoding="utf-8", errors="replace")
print(text[:200])
encoding="utf-8":主流默认;若文件带 BOM,可改 utf-8-sig。errors="replace":遇坏字节不崩溃,用 � 兜底。
方法 A2:经典 open().read()(便于扩展)
with open("data.txt", "r", encoding="utf-8", errors="replace") as f:
text = f.read()
何时选 A:需要整体解析(如全局正则、一次性转 JSON/Markdown 渲染)且文件不大。
B. 大文件稳妥写法
适用:50 MB~数 GB。 特点:常数内存、边读边处理、对换行/编码更健壮。
方法 B1:逐行迭代(最通用)
def read_lines_stream(path, encoding="utf-8"):
with open(path, "r", encoding=encoding, errors="replace") as f:
for line in f: # 通用换行:\r\n / \r 自动归一
yield line.rstrip("\n") # 如需去掉行尾换行
# 用法
for i, line in enumerate(read_lines_stream("big.log"), 1):
if i <= 5:
print(line)
优点:写法最简、适用面广。 注意:极端超长行(单行数百 MB)仍会在内存中形成一个大字符串。
方法 B2:按块读取(应对“超长行”)
def read_in_chunks(path, chunk_size=1024*1024, encoding="utf-8"):
buf = ""
with open(path, "r", encoding=encoding, errors="replace") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
buf += chunk
*lines, buf = buf.split("\n")
for line in lines:
yield line.rstrip("\r") # 兼容 \r\n
if buf: # 文件末尾可能无换行
yield buf.rstrip("\r")
# 用法
for line in read_in_chunks("ultra_long_lines.txt"):
# 处理每一行
pass
优点:可控内存,对超长行友好;缺点:代码略繁。
何时选 B:文件很大,需要流水线处理(统计/过滤/落盘),或担心“超长行”。
C. 追求极致的最快写法(mmap)
适用:需要最大吞吐的只读扫描(全文搜索、关键词定位、字段切分)。 关键点:减少系统调用,把文件直接映射到进程地址空间,按字节找换行再解码。
方法 C1:mmap 逐行扫描(高吞吐 · 常数内存)
import mmap
def iter_lines_mmap(path: str, encoding: str = "utf-8"):
"""
用 mmap 逐行读取超大文本;返回不含行尾换行的 str 生成器。
适合:全文扫描、关键词查找、极大日志处理等。
"""
with open(path, "rb") as fb:
with mmap.mmap(fb.fileno(), length=0, access=mmap.ACCESS_READ) as mm:
n = len(mm)
start = 0
while True:
end = mm.find(b"\n", start) # 字节级查找换行
if end == -1:
if start < n: # 末尾最后一行(可能无 \n)
yield mm[start:n].decode(encoding, errors="replace").rstrip("\r\n")
break
yield mm[start:end].decode(encoding, errors="replace").rstrip("\r")
start = end + 1
# 用法
# for i, line in enumerate(iter_lines_mmap("very_big.txt"), 1):
# if i <= 5:
# print(line)
方法 C2:mmap 整体读取后解码(少见,但最省代码)
import mmap
def read_all_with_mmap(path: str, encoding: str = "utf-8") -> str:
"""
映射整个文件后一次性解码为 str。
注意:需要近似文件大小的可用内存;不适合超大文件。
"""
with open(path, "rb") as fb:
with mmap.mmap(fb.fileno(), 0, access=mmap.ACCESS_READ) as mm:
return mm[:].decode(encoding, errors="replace")
经验
SSD、本地 NVMe、现代文件系统下收益更明显;机械硬盘/远程盘收益有限。mmap 避开 Python 层逐次 read() 的开销,顺序扫描时吞吐常常更高。需要自行处理换行与解码,跨平台建议按字节处理再 decode。
Bonus:智能读取器模板(即抄即用)
自动按文件大小选策略:小文件一次性读;大文件逐行;想要极致可切换 mmap。
from pathlib import Path
from typing import Iterable, Iterator, Literal
import mmap
Strategy = Literal["auto", "stream", "mmap", "all"]
def smart_read_lines(path: str | Path,
encoding: str = "utf-8",
large_threshold_mb: int = 50,
strategy: Strategy = "auto") -> Iterator[str]:
p = Path(path)
size_mb = p.stat().st_size / (1024 * 1024)
def _stream() -> Iterable[str]:
with open(p, "r", encoding=encoding, errors="replace") as f:
for line in f:
yield line.rstrip("\n")
def _mmap_iter() -> Iterable[str]:
with open(p, "rb") as fb:
with mmap.mmap(fb.fileno(), 0, access=mmap.ACCESS_READ) as mm:
n = len(mm)
start = 0
while True:
end = mm.find(b"\n", start)
if end == -1:
if start < n:
yield mm[start:n].decode(encoding, errors="replace").rstrip("\r\n")
break
yield mm[start:end].decode(encoding, errors="replace").rstrip("\r")
start = end + 1
if strategy == "all": # 小文件整体读,返回行生成器
text = p.read_text(encoding=encoding, errors="replace")
return (line for line in text.splitlines())
if strategy == "stream" or (strategy == "auto" and size_mb > large_threshold_mb):
return _stream()
if strategy == "mmap":
return _mmap_iter()
# 默认 auto + 小文件:整体读
text = p.read_text(encoding=encoding, errors="replace")
return (line for line in text.splitlines())
常见坑/FAQ
Q1:中文乱码怎么办?
首选 utf-8;有 BOM 时用 utf-8-sig。
不确定编码时可多编码尝试 + errors="replace" 兜底(不崩溃,先跑通再说):
for enc in ("utf-8", "utf-8-sig", "gb18030"):
try:
with open("file.txt", encoding=enc) as f: f.read(1024)
chosen = enc; break
except UnicodeDecodeError:
pass
自动探测(chardet / charset-normalizer)不保证准确,仅作参考。
Q2:Windows 多出空行/换行杂?
读时不要指定 newline(让 Python 做“通用换行”);写 CSV 时另说:写 CSV 需 newline="",避免空行(那是写问题)。
Q3:逐行也可能 OOM 吗?
一行依然要完整进内存,极端“超长行”会顶爆;用 B2 块读 或 C1 mmap 字节扫描。
Q4:如何无感读取 .gz 压缩日志?
from pathlib import Path
import gzip
def open_text_auto(path, encoding="utf-8"):
p = Path(path)
if p.suffix == ".gz":
return gzip.open(p, "rt", encoding=encoding, errors="replace")
return open(p, "r", encoding=encoding, errors="replace")
with open_text_auto("app.log.gz") as f:
for line in f:
pass
Q5:我需要更快?
关掉实时杀毒/索引(Windows Defender / Spotlight)对大量小文件的拖慢很明显;批量处理多文件用线程池(I/O 密集友好);单文件顺序读已接近磁盘极限。
性能自测模板
复制即可在本机跑,输出每种方法读取同一文件的耗时。附带测试数据生成器(不会覆盖已有文件)。
from pathlib import Path
import os, time, mmap, random, string
FILE = Path("big.txt")
TARGET_MB = 100 # 生成约 100MB 示例文件
def ensure_sample_file(path: Path, target_mb: int = 100, lines_per_chunk: int = 10000):
if path.exists() and path.stat().st_size >= target_mb * 1024 * 1024:
print(f"[ok] reuse existing {path} ({path.stat().st_size/1024/1024:.1f}MB)")
return
print(f"[gen] writing {target_mb}MB text to {path} ...")
line = "".join(random.choices(string.ascii_letters + string.digits + " _-", k=120))
chunk = (line + "\n") * lines_per_chunk
with open(path, "w", encoding="utf-8", newline="\n") as f:
written = 0
target = target_mb * 1024 * 1024
while written < target:
f.write(chunk)
written += len(chunk.encode("utf-8"))
print(f"[done] size={path.stat().st_size/1024/1024:.1f}MB")
def bench(name, func):
t0 = time.perf_counter()
n = func()
dt = time.perf_counter() - t0
print(f"{name:16s} {dt:7.3f}s lines={n}")
def read_all():
s = FILE.read_text(encoding="utf-8", errors="replace")
return s.count("\n") + (1 if s and not s.endswith("\n") else 0)
def read_iter():
cnt = 0
with open(FILE, "r", encoding="utf-8", errors="replace") as f:
for _ in f:
cnt += 1
return cnt
def read_mmap():
cnt = 0
with open(FILE, "rb") as fb, mmap.mmap(fb.fileno(), 0, access=mmap.ACCESS_READ) as mm:
start, n = 0, len(mm)
while True:
end = mm.find(b"\n", start)
if end == -1:
if start < n: cnt += 1
break
cnt += 1
start = end + 1
return cnt
if __name__ == "__main__":
ensure_sample_file(FILE, TARGET_MB)
bench("read_text", read_all) # A:一次性读取
bench("iter lines", read_iter) # B:逐行
bench("mmap bytes", read_mmap) # C:mmap 字节扫描
记录你的结果(示例占位,跑完替换):
文件大小read_text逐行迭代mmap10 MB– s– s– s100 MB– s– s– s1 GB– s– s– s
一般规律:mmap ≥ 逐行迭代 ≥ 一次性读取(对“只读扫描”场景)。但整体解析时(必须把全文件进内存),用 A 更直观。
小结
≤ 50 MB:Path.read_text() 一把梭。> 50 MB:逐行迭代(B1),遇超长行用块读(B2)。追求极致:mmap 字节级扫描,自行解码(C1)。统一兜底:errors="replace" 抗坏字节;读时不指定 newline 更省心。压缩日志:.gz 用 gzip.open(..., "rt") 无感读取。
延伸阅读
Python获取文本文件的行数