跳到主要内容

文件操作

文件操作是编程中常见的任务,Python 提供了丰富的文件处理功能。

文件读写

打开文件

使用 open() 函数打开文件,返回文件对象。

# 基本语法
# file = open(file_path, mode='r', encoding='utf-8')

# 打开文件(相对路径)
file = open("example.txt", "r", encoding="utf-8")

# 打开文件(绝对路径)
file = open("C:/Users/name/Desktop/example.txt", "r", encoding="utf-8")

# 使用原始字符串避免转义(Windows 路径)
file = open(r"C:\Users\name\Desktop\example.txt", "r", encoding="utf-8")

# 检查文件是否存在
import os

if os.path.exists("example.txt"):
file = open("example.txt", "r", encoding="utf-8")
print("文件打开成功")
else:
print("文件不存在")

with 语句(推荐)

使用 with 语句自动管理文件的打开和关闭。

# 推荐做法:使用 with 语句
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)

# 文件会自动关闭,即使发生异常
# 等价于但不推荐的传统写法
file = open("example.txt", "r", encoding="utf-8")
try:
content = file.read()
print(content)
finally:
file.close()

# 同时打开多个文件
with open("input.txt", "r") as f_in, open("output.txt", "w") as f_out:
content = f_in.read()
f_out.write(content)

读取文件

read() - 读取全部内容

# 读取整个文件
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)

# 限制读取的字符数
with open("example.txt", "r", encoding="utf-8") as file:
first_100 = file.read(100) # 只读前100个字符
print(first_100)

# 读取大文件(可能占用大量内存)
# 不推荐用于大文件

readline() - 读取一行

# 读取一行
with open("example.txt", "r", encoding="utf-8") as file:
line1 = file.readline()
print(line1, end="") # readline 保留换行符

line2 = file.readline()
print(line2, end="")

# 逐行读取(手动控制)
with open("example.txt", "r", encoding="utf-8") as file:
while True:
line = file.readline()
if not line: # 到达文件末尾
break
print(line, end="")

readlines() - 读取所有行

# 读取所有行到列表
with open("example.txt", "r", encoding="utf-8") as file:
lines = file.readlines()
print(lines) # ['line1\n', 'line2\n', 'line3\n']

# 去除换行符
lines = [line.strip() for line in lines]
print(lines) # ['line1', 'line2', 'line3']

# 限制读取的行数
with open("example.txt", "r", encoding="utf-8") as file:
first_3_lines = file.readlines(3) # 只读前3行
print(first_3_lines)

遍历文件(推荐)

# 直接遍历文件对象(推荐)
with open("example.txt", "r", encoding="utf-8") as file:
for line in file:
print(line, end="")

# 这种方法内存友好,适合大文件

# 处理大文件的示例
with open("large_file.txt", "r", encoding="utf-8") as file:
line_count = 0
for line in file:
line_count += 1
if line_count % 1000 == 0:
print(f"已处理 {line_count} 行")
print(f"总共 {line_count} 行")

写入文件

write() - 写入字符串

# 写入字符串
with open("output.txt", "w", encoding="utf-8") as file:
file.write("Hello, World!\n")
file.write("This is a new file.\n")

# 注意:如果文件不存在会创建,如果存在会覆盖

# 写入多行
lines = ["Line 1\n", "Line 2\n", "Line 3\n"]
with open("output.txt", "w", encoding="utf-8") as file:
file.writelines(lines)

# 或使用循环
with open("output.txt", "w", encoding="utf-8") as file:
for line in lines:
file.write(line)

追加模式

# 追加内容到文件末尾
with open("output.txt", "a", encoding="utf-8") as file:
file.write("This line is appended.\n")

# 追加多行
with open("output.txt", "a", encoding="utf-8") as file:
file.writelines(["Line A\n", "Line B\n"])

关闭文件

# 手动关闭(不推荐)
file = open("example.txt", "r", encoding="utf-8")
content = file.read()
file.close() # 必须手动关闭

# 使用 with 自动关闭(推荐)
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()
# 文件自动关闭

# 检查文件是否关闭
print(file.closed) # True(with 块结束后)

# flush() - 刷新缓冲区
with open("output.txt", "w", encoding="utf-8") as file:
file.write("Hello")
file.flush() # 立即写入磁盘,不等待缓冲区满

文件指针位置

with open("example.txt", "r", encoding="utf-8") as file:
# tell() - 获取当前位置
print(f"当前位置: {file.tell()}") # 0

# read(10) - 读取10个字符
content = file.read(10)
print(f"内容: {content}")
print(f"当前位置: {file.tell()}") # 10

# seek() - 移动指针
file.seek(0) # 移动到文件开头
print(f"seek后位置: {file.tell()}") # 0

# 读取全部
content = file.read()
print(content)

# seek(10) - 移动到第10个字节
file.seek(10)
print(f"当前位置: {file.tell()}") # 10

# seek(0, 2) - 移动到文件末尾
file.seek(0, 2) # 0=相对文件开头, 1=当前位置, 2=相对文件末尾
print(f"文件末尾: {file.tell()}")

文件模式

基本模式

# 'r' - 只读(默认)
# 文件必须存在,否则报错
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()

# 'w' - 只写
# 文件不存在则创建,存在则清空
with open("output.txt", "w", encoding="utf-8") as file:
file.write("New content")

# 'a' - 追加
# 文件不存在则创建,存在则追加到末尾
with open("output.txt", "a", encoding="utf-8") as file:
file.write("Appended content")

# 'x' - 排他性创建
# 文件不存在则创建,存在则报错(防止覆盖)
try:
with open("new_file.txt", "x", encoding="utf-8") as file:
file.write("New file")
except FileExistsError:
print("文件已存在")

读写模式

# 'r+' - 读写(文件必须存在)
with open("example.txt", "r+", encoding="utf-8") as file:
content = file.read() # 读取
file.seek(0) # 回到开头
file.write("Updated") # 写入

# 'w+' - 读写(清空文件)
with open("example.txt", "w+", encoding="utf-8") as file:
file.write("New content") # 写入
file.seek(0) # 回到开头
content = file.read() # 读取

# 'a+' - 读写追加(指针在末尾)
with open("example.txt", "a+", encoding="utf-8") as file:
file.write("Appended\n")
file.seek(0) # 回到开头才能读取
content = file.read()

二进制模式

# 'rb' - 二进制读取
with open("image.jpg", "rb") as file:
data = file.read()
print(f"文件大小: {len(data)} 字节")

# 'wb' - 二进制写入
with open("output.bin", "wb") as file:
data = b'\x00\x01\x02\x03\x04'
file.write(data)

# 'ab' - 二进制追加
with open("output.bin", "ab") as file:
file.write(b'\x05\x06\x07')

# 复制文件(二进制模式)
with open("source.jpg", "rb") as f_in:
data = f_in.read()
with open("copy.jpg", "wb") as f_out:
f_out.write(data)

文件模式总结

模式描述文件不存在文件存在指针位置
'r'只读报错正常读取开头
'w'只写创建清空开头
'a'追加创建追加末尾
'x'创建创建报错开头
'r+'读写报错正常读写开头
'w+'读写创建清空开头
'a+'读写创建追加末尾
'rb'二进制读报错正常读取开头
'wb'二进制写创建清空开头

路径处理

os 模块

os 模块提供了传统的路径操作功能。

import os

# 获取当前工作目录
cwd = os.getcwd()
print(f"当前目录: {cwd}")

# 改变工作目录
os.chdir("/path/to/directory")

# 创建目录
os.mkdir("new_dir") # 创建单个目录
os.makedirs("parent/child/grandchild") # 创建多级目录

# 删除目录
os.rmdir("empty_dir") # 删除空目录
os.removedirs("parent/child/grandchild") # 删除多级空目录

# 删除文件
os.remove("file.txt")

# 重命名文件或目录
os.rename("old_name.txt", "new_name.txt")

# 检查路径
print(os.path.exists("file.txt")) # 是否存在
print(os.path.isfile("file.txt")) # 是否是文件
print(os.path.isdir("folder")) # 是否是目录

路径拼接

import os

# os.path.join() - 跨平台路径拼接
path = os.path.join("folder", "subfolder", "file.txt")
print(path) # folder/subfolder/file.txt(Linux/Mac)
# folder\subfolder\file.txt(Windows)

# 获取文件名和目录名
path = "/home/user/documents/file.txt"
dirname = os.path.dirname(path) # /home/user/documents
basename = os.path.basename(path) # file.txt

# 分割路径
dirname, filename = os.path.split(path)
print(dirname) # /home/user/documents
print(filename) # file.txt

# 分离文件名和扩展名
filename, ext = os.path.splitext("file.txt")
print(filename) # file
print(ext) # .txt

# 获取绝对路径
abs_path = os.path.abspath("file.txt")
print(abs_path)

# 规范化路径
norm_path = os.path.normpath("folder/./subfolder/../file.txt")
print(norm_path) # folder/file

# 判断路径
print(os.path.isabs("/home/user")) # True(绝对路径)
print(os.path.isabs("file.txt")) # False(相对路径)

路径信息

import os

path = "/home/user/documents/file.txt"

# 获取文件大小(字节)
print(os.path.getsize(path))

# 获取修改时间
import time
mtime = os.path.getmtime(path)
print(time.ctime(mtime)) # 可读时间格式

# 判断各种属性
print(os.path.exists(path)) # 是否存在
print(os.path.isfile(path)) # 是否是文件
print(os.path.isdir(path)) # 是否是目录
print(os.path.islink(path)) # 是否是符号链接
print(os.path.ismount("/")) # 是否是挂载点

# 遍历目录
for root, dirs, files in os.walk("folder"):
print(f"目录: {root}")
print(f"子目录: {dirs}")
print(f"文件: {files}")
print("---")

pathlib 模块(推荐)

pathlib 是 Python 3.4+ 提供的面向对象路径处理库,更易用。

from pathlib import Path

# 创建 Path 对象
path = Path("folder/subfolder/file.txt")

# 路径拼接
path = Path("folder") / "subfolder" / "file.txt"
print(path) # folder/subfolder/file.txt

# 获取各个部分
print(path.name) # file.txt(文件名)
print(path.stem) # file(不含扩展名)
print(path.suffix) # .txt(扩展名)
print(path.parent) # folder/subfolder(父目录)
print(path.parents[1]) # folder(上级的上级)

# 获取绝对路径
abs_path = path.absolute()
print(abs_path)

# 检查路径
path.exists() # 是否存在
path.is_file() # 是否是文件
path.is_dir() # 是否是目录

# 读写文件(推荐使用)
path = Path("example.txt")

# 读取文件
content = path.read_text(encoding="utf-8")

# 写入文件
path.write_text("Hello, World!", encoding="utf-8")

# 读写二进制
data = path.read_bytes()
path.write_bytes(b"Binary data")

# 创建目录
Path("new_dir").mkdir()
Path("parent/child").mkdir(parents=True, exist_ok=True)

pathlib 遍历目录

from pathlib import Path

# 遍历目录
path = Path("folder")

# 遍历所有文件
for file in path.glob("*.txt"):
print(file)

# 递归遍历(包括子目录)
for file in path.rglob("*.txt"):
print(file)

# 遍历所有内容
for item in path.iterdir():
print(item)
if item.is_file():
print(f"文件: {item.name}")
elif item.is_dir():
print(f"目录: {item.name}")

# 递归遍历所有文件
for file in path.rglob("*"):
if file.is_file():
print(file)

实用示例

from pathlib import Path

# 批量重命名文件
folder = Path("photos")
for file in folder.glob("*.jpg"):
new_name = file.stem + "_old" + file.suffix
file.rename(folder / new_name)

# 查找并处理文件
for file in Path(".").rglob("*.log"):
if file.stat().st_size > 1024 * 1024: # 大于1MB
print(f"大文件: {file}")

# 创建目录结构
project = Path("project")
project.mkdir(exist_ok=True)
(project / "src").mkdir(exist_ok=True)
(project / "tests").mkdir(exist_ok=True)
(project / "docs").mkdir(exist_ok=True)

# 清理空目录
for folder in Path(".").rglob("*"):
if folder.is_dir() and not any(folder.iterdir()):
print(f"删除空目录: {folder}")
folder.rmdir()

JSON 操作

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式。

读取 JSON

import json

# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as file:
data = json.load(file)
print(data)
print(type(data)) # dict 或 list

# 读取 JSON 字符串
json_string = '{"name": "Alice", "age": 25}'
data = json.loads(json_string)
print(data) # {'name': 'Alice', 'age': 25}

# 读取带注释的 JSON(需要去除注释)
def load_json_with_comments(file_path):
"""读取带注释的 JSON 文件"""
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
# 移除 // 单行注释
lines = [line for line in content.split('\n') if '//' not in line]
content = '\n'.join(lines)
return json.loads(content)

写入 JSON

import json

# 写入 JSON 文件
data = {
"name": "Alice",
"age": 25,
"city": "New York"
}

with open("output.json", "w", encoding="utf-8") as file:
json.dump(data, file, ensure_ascii=False, indent=2)

# ensure_ascii=False: 支持中文
# indent=2: 美化输出

# 转换为 JSON 字符串
json_string = json.dumps(data, ensure_ascii=False, indent=2)
print(json_string)

# 不换行输出(紧凑格式)
json_string = json.dumps(data, separators=(",", ":"))
print(json_string)

处理复杂 JSON

import json
from pathlib import Path

# 读取配置文件
config_file = Path("config.json")
if config_file.exists():
with open(config_file, "r", encoding="utf-8") as file:
config = json.load(file)
print(config)
else:
# 创建默认配置
config = {
"database": {
"host": "localhost",
"port": 5432,
"name": "mydb"
},
"debug": True,
"max_connections": 10
}
with open(config_file, "w", encoding="utf-8") as file:
json.dump(config, file, indent=2, ensure_ascii=False)

# 处理列表数据
users = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
{"id": 3, "name": "Charlie", "email": "charlie@example.com"}
]

with open("users.json", "w", encoding="utf-8") as file:
json.dump(users, file, indent=2, ensure_ascii=False)

# 读取并更新
with open("users.json", "r", encoding="utf-8") as file:
users = json.load(file)

# 添加新用户
users.append({"id": 4, "name": "David", "email": "david@example.com"})

# 保存
with open("users.json", "w", encoding="utf-8") as file:
json.dump(users, file, indent=2, ensure_ascii=False)

JSON 错误处理

import json

# 捕获 JSON 解析错误
try:
with open("data.json", "r", encoding="utf-8") as file:
data = json.load(file)
except json.JSONDecodeError as e:
print(f"JSON 解析错误: {e}")
print(f"错误位置: 行 {e.lineno}, 列 {e.colno}")
except FileNotFoundError:
print("文件不存在")

# 验证 JSON 格式
def validate_json(file_path):
"""验证 JSON 文件格式是否正确"""
try:
with open(file_path, "r", encoding="utf-8") as file:
json.load(file)
return True
except json.JSONDecodeError:
return False
except FileNotFoundError:
return False

# 使用
if validate_json("data.json"):
print("JSON 格式正确")
else:
print("JSON 格式错误")

CSV 操作

CSV(Comma-Separated Values)是一种常用的表格数据存储格式。

读取 CSV

import csv

# 基本读取
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file)
for row in reader:
print(row)

# 读取到列表
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file)
data = list(reader)
print(data)

# 跳过表头
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file)
header = next(reader) # 读取表头
for row in reader:
print(row)

# 使用 DictReader(推荐)
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
print(row["name"], row["age"])

# 获取字段名
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
print(reader.fieldnames) # ['name', 'age', 'city']

写入 CSV

import csv

# 写入列表数据
data = [
["Alice", 25, "New York"],
["Bob", 30, "London"],
["Charlie", 35, "Paris"]
]

with open("output.csv", "w", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
writer.writerows(data)

# 写入带表头的数据
with open("output.csv", "w", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
writer.writerow(["Name", "Age", "City"])
writer.writerows(data)

# 使用 DictWriter(推荐)
with open("output.csv", "w", encoding="utf-8", newline="") as file:
fieldnames = ["name", "age", "city"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows([
{"name": "Alice", "age": 25, "city": "New York"},
{"name": "Bob", "age": 30, "city": "London"}
])

# 注意:newline="" 防止在 Windows 上出现空行

CSV 进阶操作

import csv

# 指定分隔符
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file, delimiter=';') # 分号分隔
for row in reader:
print(row)

# 处理引号
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file, quotechar='"')
for row in reader:
print(row)

# 读取大文件(逐行处理)
with open("large_file.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file)
header = next(reader)
for i, row in enumerate(reader):
if i >= 1000: # 只处理前1000行
break
print(row)

# 过滤数据
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
if int(row["age"]) > 30: # 只处理年龄大于30的
print(row)

使用 pandas 处理 CSV

import pandas as pd

# 读取 CSV
df = pd.read_csv("data.csv", encoding="utf-8")
print(df.head()) # 查看前5行

# 只读取特定列
df = pd.read_csv("data.csv", usecols=["name", "age"])

# 过滤数据
df_filtered = df[df["age"] > 30]
print(df_filtered)

# 写入 CSV
df.to_csv("output.csv", index=False, encoding="utf-8")

# 追加数据
df.to_csv("output.csv", mode="a", header=False, index=False)

CSV 实用示例

import csv
from pathlib import Path

# 合并多个 CSV 文件
output_file = Path("merged.csv")
csv_files = Path(".").glob("*.csv")

with open(output_file, "w", encoding="utf-8", newline="") as f_out:
writer = csv.writer(f_out)
for i, csv_file in enumerate(csv_files):
with open(csv_file, "r", encoding="utf-8") as f_in:
reader = csv.reader(f_in)
for j, row in enumerate(reader):
if i == 0 or j > 0: # 只保留第一个文件的表头
writer.writerow(row)

# 数据转换
with open("input.csv", "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
with open("output.csv", "w", encoding="utf-8", newline="") as file:
fieldnames = ["name", "age_upper", "city"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()

for row in reader:
writer.writerow({
"name": row["name"],
"age_upper": row["age"].upper(),
"city": row["city"]
})

文件操作最佳实践

1. 使用 with 语句

# 推荐
with open("file.txt", "r") as file:
content = file.read()

# 不推荐
file = open("file.txt", "r")
content = file.read()
file.close()

2. 指定编码

# 推荐
with open("file.txt", "r", encoding="utf-8") as file:
content = file.read()

# 避免编码问题

3. 处理大文件

# 推荐:逐行处理
with open("large_file.txt", "r") as file:
for line in file:
process(line)

# 不推荐:一次性读取
with open("large_file.txt", "r") as file:
content = file.read() # 可能占用大量内存

4. 使用 pathlib

# 推荐(现代方式)
from pathlib import Path

path = Path("folder/file.txt")
content = path.read_text()
path.write_text("content")

# 不推荐(传统方式)
import os
path = os.path.join("folder", "file.txt")
with open(path, "r") as file:
content = file.read()

5. 错误处理

# 推荐
try:
with open("file.txt", "r") as file:
content = file.read()
except FileNotFoundError:
print("文件不存在")
except PermissionError:
print("没有权限")
except Exception as e:
print(f"发生错误: {e}")

小结

本章节介绍了 Python 的文件操作:

  • 文件读写: open(), with 语句, read(), write(), 文件指针
  • 文件模式: r, w, a, x, +, 二进制模式
  • 路径处理: os 模块, pathlib 模块(推荐)
  • JSON 操作: json.load(), json.dump(), 复杂JSON处理
  • CSV 操作: csv.reader, csv.writer, DictReader, DictWriter

掌握文件操作是处理数据持久化和数据交换的基础。下一章我们将学习异常处理,包括 try-except、自定义异常等。