大家好,今天我们来聊聊Python操作MySQL数据库这个话题。别看这玩意儿简单,但新手朋友经常在这上面摔跟头。
今天我就手把手教你,从零开始学习使用PyMySQL操作MySQL数据库,让你一次掌握,从此告别"Python连接数据库总出错"的尴尬!
为什么选择PyMySQL?
在开始学习之前,先聊聊为什么我们要选择PyMySQL:
- 纯Python实现:完全用Python编写,不需要额外的C库依赖
- 兼容性好:支持Python 3.6+和MySQL 5.6+
安装PyMySQL
环境要求
在安装之前,确保你已经:
安装步骤
# 使用pip安装PyMySQL
pip install PyMySQL
# 验证安装是否成功
python -c "import pymysql; print(pymysql.__version__)"
如果看到版本号输出,说明安装成功。
建立数据库连接
基本连接方式
import pymysql
# 建立数据库连接
connection = pymysql.connect(
host='localhost', # 数据库主机地址
port=3306, # 数据库端口
user='root', # 用户名
password='your_password', # 密码
database='test_db', # 数据库名
charset='utf8mb4' # 字符集
)
# 关闭连接
connection.close()
连接参数详解
import pymysql
# 完整的连接参数配置
connection = pymysql.connect(
host='localhost', # 主机地址
port=3306, # 端口号
user='your_username', # 用户名
password='your_password', # 密码
database='your_database', # 数据库名
charset='utf8mb4', # 字符集
autocommit=False, # 是否自动提交
connect_timeout=10, # 连接超时时间
read_timeout=10, # 读取超时时间
write_timeout=10, # 写入超时时间
cursorclass=pymysql.cursors.DictCursor # 游标类型
)
使用上下文管理器
推荐使用with语句管理连接,确保连接正确关闭:
import pymysql
# 使用with语句自动管理连接
with pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='test_db',
charset='utf8mb4'
) as connection:
# 在这里执行数据库操作
pass
# 连接会自动关闭
创建游标对象
游标对象用于执行SQL语句和获取结果:
import pymysql
# 建立连接
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='test_db',
charset='utf8mb4'
)
# 创建游标对象
cursor = connection.cursor()
# 执行SQL语句
cursor.execute("SELECT VERSION()")
# 获取结果
result = cursor.fetchone()
print(f"MySQL版本: {result[0]}")
# 关闭游标和连接
cursor.close()
connection.close()
不同类型的游标
import pymysql.cursors
# 默认游标 - 返回元组
with pymysql.connect(...) as connection:
cursor = connection.cursor()
cursor.execute("SELECT * FROM users")
result = cursor.fetchall() # ((1, 'Alice', 25), (2, 'Bob', 30))
# 字典游标 - 返回字典
with pymysql.connect(
cursorclass=pymysql.cursors.DictCursor
) as connection:
cursor = connection.cursor()
cursor.execute("SELECT * FROM users")
result = cursor.fetchall() # [{'id': 1, 'name': 'Alice', 'age': 25}, ...]
# 字典游标的优势是可以通过键名访问字段
print(result[0]['name']) # 输出: Alice
数据库基本操作
创建数据库和表
import pymysql
# 连接MySQL服务器(不指定数据库)
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 创建数据库
cursor.execute("CREATE DATABASE IF NOT EXISTS myapp")
# 选择数据库
cursor.execute("USE myapp")
# 创建用户表
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
cursor.execute(create_table_sql)
# 提交事务
connection.commit()
finally:
connection.close()
插入数据
import pymysql
from datetime import datetime
# 插入单条数据
def insert_user(username, email, age):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 使用参数化查询防止SQL注入
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, (username, email, age))
# 提交事务
connection.commit()
print("用户插入成功")
except Exception as e:
# 回滚事务
connection.rollback()
print(f"插入失败: {e}")
finally:
connection.close()
# 调用函数
insert_user("张三", "zhangsan@example.com", 25)
批量插入数据
import pymysql
# 批量插入数据
def batch_insert_users(users_data):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
# executemany用于批量插入
cursor.executemany(sql, users_data)
connection.commit()
print(f"成功插入{len(users_data)}条用户数据")
except Exception as e:
connection.rollback()
print(f"批量插入失败: {e}")
finally:
connection.close()
# 准备批量数据
users = [
("李四", "lisi@example.com", 28),
("王五", "wangwu@example.com", 32),
("赵六", "zhaoliu@example.com", 29)
]
# 执行批量插入
batch_insert_users(users)
查询数据
import pymysql
# 查询单条数据
def get_user_by_id(user_id):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
result = cursor.fetchone()
return result
finally:
connection.close()
# 查询多条数据
def get_users_by_age(min_age):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE age >= %s ORDER BY age"
cursor.execute(sql, (min_age,))
results = cursor.fetchall()
return results
finally:
connection.close()
# 使用示例
user = get_user_by_id(1)
if user:
print(f"用户信息: {user}")
users = get_users_by_age(25)
print("年龄大于等于25的用户:")
for user in users:
print(f" {user['username']}, {user['age']}岁")
更新数据
import pymysql
# 更新用户信息
def update_user(user_id, username=None, email=None, age=None):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 动态构建更新语句
updates = []
params = []
if username:
updates.append("username = %s")
params.append(username)
if email:
updates.append("email = %s")
params.append(email)
if age is not None:
updates.append("age = %s")
params.append(age)
if not updates:
print("没有需要更新的字段")
return
# 添加WHERE条件参数
params.append(user_id)
sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
cursor.execute(sql, params)
connection.commit()
print("用户信息更新成功")
except Exception as e:
connection.rollback()
print(f"更新失败: {e}")
finally:
connection.close()
# 使用示例
update_user(1, age=26, email="newemail@example.com")
删除数据
import pymysql
# 删除用户
def delete_user(user_id):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
sql = "DELETE FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
connection.commit()
print("用户删除成功")
except Exception as e:
connection.rollback()
print(f"删除失败: {e}")
finally:
connection.close()
# 使用示例
delete_user(1)
事务处理
基本事务操作
import pymysql
# 事务示例:转账操作
def transfer_money(from_user_id, to_user_id, amount):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 开始事务
connection.begin()
# 检查转出用户余额
cursor.execute("SELECT balance FROM accounts WHERE user_id = %s", (from_user_id,))
from_balance = cursor.fetchone()
if not from_balance or from_balance[0] < amount:
raise Exception("余额不足")
# 扣除转出用户余额
cursor.execute("UPDATE accounts SET balance = balance - %s WHERE user_id = %s",
(amount, from_user_id))
# 增加转入用户余额
cursor.execute("UPDATE accounts SET balance = balance + %s WHERE user_id = %s",
(amount, to_user_id))
# 记录转账日志
cursor.execute("""
INSERT INTO transfer_logs (from_user_id, to_user_id, amount, transfer_time)
VALUES (%s, %s, %s, NOW())
""", (from_user_id, to_user_id, amount))
# 提交事务
connection.commit()
print("转账成功")
except Exception as e:
# 回滚事务
connection.rollback()
print(f"转账失败: {e}")
finally:
connection.close()
# 使用示例
transfer_money(1, 2, 100.00)
高级查询技巧
分页查询
import pymysql
# 分页查询用户列表
def get_users_paginated(page, page_size):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
offset = (page - 1) * page_size
sql = "SELECT * FROM users LIMIT %s OFFSET %s"
cursor.execute(sql, (page_size, offset))
users = cursor.fetchall()
# 获取总记录数
cursor.execute("SELECT COUNT(*) as total FROM users")
total = cursor.fetchone()['total']
return {
'users': users,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
finally:
connection.close()
# 使用示例
result = get_users_paginated(1, 10)
print(f"第{result['page']}页,共{result['total_pages']}页")
for user in result['users']:
print(f" {user['username']}")
条件查询
import pymysql
# 复杂条件查询
def search_users(username=None, min_age=None, max_age=None, email_domain=None):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# 动态构建查询条件
conditions = []
params = []
if username:
conditions.append("username LIKE %s")
params.append(f"%{username}%")
if min_age is not None:
conditions.append("age >= %s")
params.append(min_age)
if max_age is not None:
conditions.append("age <= %s")
params.append(max_age)
if email_domain:
conditions.append("email LIKE %s")
params.append(f"%@{email_domain}")
# 构建SQL语句
if conditions:
sql = f"SELECT * FROM users WHERE {' AND '.join(conditions)} ORDER BY created_at DESC"
else:
sql = "SELECT * FROM users ORDER BY created_at DESC"
cursor.execute(sql, params)
results = cursor.fetchall()
return results
finally:
connection.close()
# 使用示例
users = search_users(username="张", min_age=20, max_age=30)
print("搜索结果:")
for user in users:
print(f" {user['username']}, {user['age']}岁, {user['email']}")
错误处理和最佳实践
异常处理
import pymysql
from pymysql import IntegrityError, OperationalError
# 完整的错误处理示例
def safe_database_operation():
connection = None
try:
# 建立连接
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
autocommit=False,
connect_timeout=5
)
with connection.cursor() as cursor:
# 执行数据库操作
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, ("test_user", "test@example.com", 25))
# 提交事务
connection.commit()
print("操作成功")
except IntegrityError as e:
# 处理数据完整性错误(如唯一约束冲突)
if connection:
connection.rollback()
print(f"数据完整性错误: {e}")
except OperationalError as e:
# 处理操作错误(如连接失败)
print(f"数据库操作错误: {e}")
except Exception as e:
# 处理其他异常
if connection:
connection.rollback()
print(f"未知错误: {e}")
finally:
# 确保连接关闭
if connection:
connection.close()
# 调用函数
safe_database_operation()
连接池使用
import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql,
host='localhost',
port=3306,
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
maxconnections=20, # 最大连接数
mincached=2, # 最小缓存连接数
maxcached=5, # 最大缓存连接数
maxshared=3, # 最大共享连接数
blocking=True, # 连接池满时是否阻塞等待
maxusage=None, # 单个连接最大复用次数
setsession=[], # 开始会话前执行的命令列表
ping=0 # ping MySQL服务端
)
# 使用连接池
def get_user_with_pool(user_id):
# 从连接池获取连接
connection = pool.connection()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
result = cursor.fetchone()
return result
finally:
# 关闭连接(归还到连接池)
connection.close()
# 使用示例
user = get_user_with_pool(1)
if user:
print(f"用户信息: {user}")
安全最佳实践
防止SQL注入
import pymysql
# 正确的做法:使用参数化查询
def safe_query(username):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 使用参数化查询,防止SQL注入
sql = "SELECT * FROM users WHERE username = %s"
cursor.execute(sql, (username,))
result = cursor.fetchall()
return result
finally:
connection.close()
# 错误的做法:字符串拼接(容易被SQL注入)
def unsafe_query(username):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 危险!容易被SQL注入攻击
sql = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(sql) # 不要这样做!
result = cursor.fetchall()
return result
finally:
connection.close()
# 安全查询示例
users = safe_query("admin'; DROP TABLE users; --")
print(users) # 只会查找用户名为 "admin'; DROP TABLE users; --" 的用户
密码安全管理
import pymysql
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 从环境变量获取数据库配置
def get_db_config():
return {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', 3306)),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD'),
'database': os.getenv('DB_NAME', 'myapp'),
'charset': 'utf8mb4'
}
# 使用环境变量连接数据库
def connect_with_env():
config = get_db_config()
connection = pymysql.connect(**config)
return connection
# 创建.env文件示例:
"""
DB_HOST=localhost
DB_PORT=3306
DB_USER=myuser
DB_PASSWORD=mypassword
DB_NAME=myapp
"""
性能优化技巧
批量操作优化
import pymysql
# 高效的批量插入
def efficient_batch_insert(data_list):
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 使用事务和批量插入提高性能
connection.begin()
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.executemany(sql, data_list)
connection.commit()
print(f"批量插入{len(data_list)}条记录成功")
except Exception as e:
connection.rollback()
print(f"批量插入失败: {e}")
finally:
connection.close()
# 准备大量数据进行测试
large_data = [
(f"user_{i}", f"user_{i}@example.com", 20 + (i % 50))
for i in range(1000)
]
# 执行批量插入
efficient_batch_insert(large_data)
查询优化
import pymysql
# 使用索引优化查询
def optimized_query():
connection = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='myapp',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# 为经常查询的字段创建索引
cursor.execute("CREATE INDEX idx_username ON users(username)")
cursor.execute("CREATE INDEX idx_email ON users(email)")
cursor.execute("CREATE INDEX idx_age ON users(age)")
# 使用EXPLAIN分析查询性能
cursor.execute("EXPLAIN SELECT * FROM users WHERE username = 'admin'")
explain_result = cursor.fetchall()
print("查询执行计划:")
for row in explain_result:
print(row)
finally:
connection.close()
结语
到这里,PyMySQL操作数据库的全面学习就完成了!从安装配置、基本操作到高级技巧,每一步都详细讲解了。
记住几个关键点:
PyMySQL是Python操作MySQL数据库的强大工具,掌握它对于后端开发人员来说至关重要。
阅读原文:原文链接
该文章在 2025/12/10 18:44:33 编辑过