# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块
# pip install pymysql
# /usr/local/mysql/support-files/mysql.server start
# 一般流程
# 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束
import pymysql
dbInfo = {
'host' : 'localhost',
'port' : 3306,
'user' : 'root',
'password' : 'Sooele0000%',
'db' : 'world'
}
#数据库ip地址
#端口
#用户名
#密码
#数据库名
sqls = ['select 1', 'select VERSION()']
result = []
class ConnDB(object):
def __init__(self, dbInfo, sqls):
self.host = dbInfo['host']
self.port = dbInfo['port']
self.user = dbInfo['user']
self.password = dbInfo['password']
self.db = dbInfo['db']
self.sqls = sqls
self.run()
def run(self):
conn = pymysql.connect(
host = self.host,
port = self.port,
user = self.user,
password = self.password,
db = self.db
)
# 游标建立的时候就开启了一个隐形的事物
cur = conn.cursor()
try:
for command in self.sqls:
cur.execute(command)
result.append(cur.fetchone())
# 关闭游标
cur.close()
conn.commit()
except:
conn.rollback()
# 关闭数据库连接
conn.close()
if __name__ == "__main__":
db = ConnDB(dbInfo, sqls)
print(result)
# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块
# pip install pymysql
# /usr/local/mysql/support-files/mysql.server start
# 一般流程
# 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束
import pymysql
dbInfo = {
'host' : 'localhost',
'port' : 3306,
'user' : 'root',
'password' : 'Sooele0000%',
'db' : 'world'
}
#数据库ip地址
#端口
#用户名
#密码
#数据库名
sqls = ['select 1', 'select VERSION()']
result = []
class ConnDB(object):
def __init__(self, dbInfo, sqls):
self.host = dbInfo['host']
self.port = dbInfo['port']
self.user = dbInfo['user']
self.password = dbInfo['password']
self.db = dbInfo['db']
self.sqls = sqls
self.run()
def run(self):
conn = pymysql.connect(
host = self.host,
port = self.port,
user = self.user,
password = self.password,
db = self.db
)
# 游标建立的时候就开启了一个隐形的事物
cur = conn.cursor()
try:
for command in self.sqls:
cur.execute(command)
result.append(cur.fetchone())
# 关闭游标
cur.close()
conn.commit()
except:
conn.rollback()
# 关闭数据库连接
conn.close()
if __name__ == "__main__":
db = ConnDB(dbInfo, sqls)
print(result)
# 执行批量插入
values = [(id,'testuser'+str(id)) for id in range(4, 21) ]
cursor.executemany('INSERT INTO '+ TABLE_NAME +' values(%s,%s)' ,values)
# 练习
# 1 pymysql是否是线程安全的?如何使用ThreadPoolExecutor与pymysql实现多线程访问数据库
##########表结构############
CREATE TABLE `数据库名`.`表名` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `title` varchar(200) DEFAULT NULL COMMENT '文章标题', PRIMARY KEY (`id`) );
######## settings.py ##########
#mysql-config
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'scrapy'
MYSQL_USER = 'root'
MYSQL_PASSWD ='rootroot'
MYSQL_PORT = 3306
######## pipelines.py ##########
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import json
from logging import Logger
from twisted.enterprise import adbapi
from blogscrapy.db.dbhelper import DBHelper
import codecs
from logging import log
from scrapy.utils.project import get_project_settings
class BlogscrapyPipeline(object):
def __init__(self):
self.file = open('blog.json', 'a+', encoding='utf-8')
self.db = DBHelper()
def process_item(self, item, spider):
content = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(content)
self.db.insert(item)
return item
def close_spider(self, spider):
self.file.close()
######## DBHelper.py ##########
# -*- coding: utf-8 -*-
import pymysql
from twisted.enterprise import adbapi
from scrapy.utils.project import get_project_settings
class DBHelper():
def __init__(self):
settings = get_project_settings()
dbparams = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
use_unicode=False,
)
# 将字典扩展为关键字参数
dbpool = adbapi.ConnectionPool('pymysql', **dbparams)
self.__dbpool = dbpool
def connect(self):
return self.__dbpool
def insert(self, item):
# 封装insert操作
sql = "insert into article(title) values(%s)"
query = self.__dbpool.runInteraction(self._conditional_insert, sql, item)
query.addErrback(self._handle_error)
return item
def _conditional_insert(self, canshu, sql, item):
# 传items的数据
params = (item['title'])
canshu.execute(sql, params)
def _handle_error(self, failue):
print(failue)
def __del__(self):
try:
self.__dbpool.close()
except Exception as ex:
print(ex)
一、数据库增删改操作
commit()方法:在数据库里增、删、改的时候,必须要进行提交,否则插入的数据不生效。
executemany():用来同时插入多条数据:
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"LBLB1212@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)"
cursor.executemany(sql,[("tom","123"),("alex",'321')])
db.commit() #提交数据
cursor.close()
db.close()
execute()和executemany()都会返回受影响的行数:
sql = "delete from userinfo where username=%s"
res = cursor.executemany(sql,("jack",))
print("res=",res)
#运行结果
res= 1
当表中有自增的主键的时候,可以使用lastrowid来获取最后一次自增的ID:
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"LBLB1212@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)"
cursor.execute(sql,("zed","123"))
print("the last rowid is ",cursor.lastrowid)
db.commit() #提交数据
cursor.close()
db.close()
#运行结果
the last rowid is 10
二、数据库的查询操作
fetchone():获取下一行数据,第一次为首行;
fetchall():获取所有行数据源
fetchmany(4):获取下4行数据
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"@@@@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchone() #第一次执行
print(res)
res = cursor.fetchone() #第二次执行
print(res)
cursor.close()
db.close()
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"@@@@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall() #第一次执行
print(res)
res = cursor.fetchall() #第二次执行
print(res)
cursor.close()
db.close()
可以看到,第二次获取的时候,什么数据都没有获取到,这个类似于文件的读取操作。
默认情况下,我们获取到的返回值是元组,只能看到每行的数据,却不知道每一列代表的是什么,这个时候可以使用以下方式来返回字典,每一行的数据都会生成一个字典:
cursor = db.cursor(cursor=pymysql.cursors.DictCursor) #在实例化的时候,将属性cursor设置为pymysql.cursors.DictCursor
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"LBLB1212@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor(cursor=pymysql.cursors.DictCursor)
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall()
print(res)
cursor.close()
db.close()
这样获取到的内容就能够容易被理解和使用了!
在获取行数据的时候,可以理解开始的时候,有一个行指针指着第一行的上方,获取一行,它就向下移动一行,所以当行指针到最后一行的时候,就不能再获取到行的内容,所以我们可以使用如下方法来移动行指针:
cursor.scroll(1,mode='relative') # 相对当前位置移动
cursor.scroll(2,mode='absolute') # 相对绝对位置移动
第一个值为移动的行数,整数为向下移动,负数为向上移动,mode指定了是相对当前位置移动,还是相对于首行移动
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall()
print(res)
cursor.scroll(0,mode='absolute') #相对首行移动了0,就是把行指针移动到了首行
res = cursor.fetchall() #第二次获取到的内容
print(res)
#运行结果
[{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}]
[{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}]
在python的文件操作中支持上下文管理器,在操作数据库的时候也可以使用:
import pymysql
config={
"host":"127.0.0.1",
"user":"root",
"password":"LBLB1212@@",
"database":"dbforpymysql"
}
db = pymysql.connect(**config)
with db.cursor(cursor=pymysql.cursors.DictCursor) as cursor: #获取数据库连接的对象
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchone()
print(res)
cursor.scroll(2,mode='relative')
res = cursor.fetchone()
print(res)
cursor.close()
db.close()
#运行结果
{'id': 1, 'username': 'frank', 'passwd': '123'}
{'id': 5, 'username': 'bob', 'passwd': '123'}