编辑推荐: |
本文来自于博客,本文详细介绍了如何使用Python操作MySQL、使用Python操作Redis及项目实战。 |
|
操作MySQL
1)Windows中安装python和pycharm
2)ubuntu中安装python和pycharm
安装步骤不做赘述,pycharm运行脚本
#!/usr/bin/env python
import MySQLdb
#get connection
try:
con = MySQLdb.connect(
host='localhost',
user='root',
passwd='12346',
port=3308,
db='sakila',
charset='utf8'
)
except MySQLdb.Error as e:
print('error:%s'% e)
cursor = con.cursor()
cursor.execute('SELECT * FROM `store`')
rest = cursor.fetchone()
print(rest)
#close connection
con.close() |
3)查询数据库
#!/usr/bin/env python
import MySQLdb
class MysqlQuery(object):
def __init__(self):
self.get_conn()
def get_conn(self):
# get connection
try:
self.conn = MySQLdb.connect(
host='localhost',
user='root',
passwd='123456',
port=3308,
db='sakila',
charset='utf8'
)
except MySQLdb.Error as e:
print('error:%s' % e)
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM `store`')
rest = cursor.fetchone()
print(rest)
def close_conn(self):
try:
if self.conn:
# close connection
self.conn.close()
except MySQLdb.Error as e:
print('Error:%s'%e)
def get_one(self):
#prepare SQL
/*虽然定义类型为int,可使用string*/
sql='SELECT * FROM `store` where `store_id` =%s'
#get cursor
cursor=self.conn.cursor()
cursor.execute(sql,('1',))
print(cursor.rowcount)
rest=dict(zip([k[0] for k in cursor.description],cursor.fetchone()))
print(rest)
print(rest['store_id'])
self.close_conn()
return rest
def main():
obj = MysqlQuery()
rest = obj.get_one();
print(rest['store_id'])
if __name__ == '__main__':
main()
/*取走所有数据,形成数组*/
rest = [dict(zip([k[0] for k in cursor.description],row))for row in cursor.fetchall()]
|
zip([iterable, …])
Python的一个内建函数,它接受一系列可迭代的对象作为参数,将对象中对应的元素打包成一个个tuple(元组),然后返回由这些tuples组成的list(列表)。若传入参数的长度不等,则返回list的长度和参数中长度最短的对象相同。利用*号操作符,可以将list unzip(解压)。
dict()作用:dict() 函数用于创建一个字典。返回一个字典。
class dict(**kwarg)
class dict(mapping, **kwarg)
class dict(iterable, **kwarg)
/*
kwargs -- 关键字
mapping -- 元素的容器。
iterable -- 可迭代对象
*/ |
4)更新数据
def add_one(self):
row_count=0
try:
sql = ("insert into `film`(`title`,`description`,`language_id`) value" "(%s,%s,%s);")
cursor = self.conn.cursor()
cursor.execute(sql, ('chia', 'ashajhsjah','1'))
self.conn.commit()
except:
print('error')
self.conn.rollback()
row_count=cursor.rowcount
cursor.close()
self.close_conn()
return row_count |
5)ORM:SQLAlChemy
pip install SQLAlchemy
import sqlalchemy |
declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联。
增删改查
#!/usr/bin/python
#coding=utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime, Boolean
import datetime
#engine=create_engine('数据库类型://用户名:密码@ip:端口/数据库名')
engine=create_engine('mysql://root:123456@localhost:3308/sakila')
Base=declarative_base()
Session = sessionmaker(bind=engine)
class Store(Base):
__tablename__='country'#数据库表名
country_id = Column(Integer, primary_key=True)
country = Column(String(200), nullable=False)
last_update = Column(String(2000), nullable=False)
class MysqlOrmTest(object):
def __init__(self):
self.session=Session()
def add_one(self):
new_obj=Store(
country_id='130',
country='hhhsahsa',
last_update=datetime.datetime.now()#此处需要import datetime
)
self.session.add(new_obj)
self.session.commit()
return new_obj
def get_one(self):
return self.session.query(Store).get(1)
def update_date(self):
obj=self.session.query(Store).get(1)
obj.manager_staff_id=1
self.session.add(obj)
self.session.commit()
return obj
def delete_data(self):
data=self.session.query(Store).get(3)
self.session.delete(data)
self.session.commit()
def main():
# rest = obj.add_one()
# print(dir(rest))
# print(obj.get_one().title)
# print(obj.get_more().count())
# for row in obj.get_more():
# print(row.title)
# print(obj.update_data())
if __name__ == '__main__':
main()
|
6)项目实战
使用pycharm专业版,选择flask框架,代码如下:
from flask import Flask
app = Flask(__name__)
@app.route('/hello')
def hello_world():
return 'Hello World!hello'
if __name__ == '__main__':
app.run(debug=True)
##flask支持热部署 |
简单搭建flask架构网站
本人使用pycharm开发flask项目,可以利用工具导入工具包:
##引入相关包
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, render_template, flash, redirect, url_for, abort, request
app = Flask(__name__)
##配置数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost:3308/flaskdb'
db=SQLAlchemy(app)
##完成ORM映射
class News(db.Model):
__tablename__='news'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
img_url = db.Column(db.String(2000), nullable=False)
content = db.Column(db.String(2000), nullable=True)
is_valid = db.Column(db.String(2000), nullable=True)
created_at = db.Column(db.String(2000), nullable=True)
updated_at = db.Column(db.String(2000), nullable=True)
news_type = db.Column(db.String(2000), nullable=True)
def __repr__(self):
return '<News %r>' % self.title
##python生成HTML效果不好
@app.route('/hello')
def hello_world():
return 'Hello World!hello'
##使用渲染引擎Jinja2
@app.route('/')
def index():
news_list=News.query.all()
return render_template("index.html",news_list=news_list)
@app.route('/cat/<name>/')
def cat(name):
news_list=News.query.filter(News.news_type==name)
return render_template('cat.html',new_list=news_list)
@app.route('/detail/<int:pk>/')
def detail(pk):
new_obj=News.query.get(pk)
return render_template('detail.html',new_obj=new_obj)
@app.route('/admin/')
@app.route('/admin/<int:page>/')
def admin(page=None):
return render_template("admin/index.html")
@app.route('/admin/add/', methods=['GET', 'POST'])
def add():
return render_template("admin/add.html")
@app.route('/admin/update/<int:pk>/', methods=['GET', 'POST'])
def update(pk):
return render_template("admin/update.html")
@app.route('/admin/delete/<int:pk>/', methods=['POST'])
def delete(pk):
return 'no'
if __name__ == '__main__':
app.run(debug=True) |
操作Redis
1) Redis安装
sudo apt-get update
sudo apt-get install redis-server
##启动Redis服务器
redis-server
##查看 redis 是否启动?
redis-cli |
2)Redis命令
Set animal 'cat'
get animal
##添加value
append animal 'dog'
mset user1 'chu' user2 'yao'
mget user1 user2
set num 9
incr/decr num /*增加减少1*/
set user:chuyao;age:45 'asasasasa' |
列表(list)相关操作
lpush/rpush q1 'chu' 'yao' 'Amy'/*从左、右插入数据*/
lrange/*获取指定长度的数据*/
ltrim/*截取一定长度的数据*/
lpop/rpop/*移除最左、右的元素并返回*/
lpushx/rpushx --key/* key存在时候才插入数据,不存在时不做任何处理*/ |
集合(Set)相关操作
sadd/srem /*添加、删除元素*/
sismember /*判断是否为set的一个元素*/
smembers /*返回该集合的所有成员*/
sdiff /*返回一个集合与其他集合的差异*/
sinter/*返回几个集合的交集*/
sunion/*返回几个集合的并集*/ |
散列(hash)相关操作
3)redis-py连接
import redis
r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)
user1=r.get('user1')
print(user1) |
注意,如果是远程连接数据库,需要修改Redis配置文件。
1)注释掉bind 127.0.0.1可以使所有的ip访问redis。
2)修改办法:protected-mode no
4)Python 操作String类型
import redis
class TestString(object):
def __init__(self):
self.r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)
def test_set(self):
rest=self.r.set('user2','Amy');
print(rest)
def test_get(self):
rest=self.r.get('user1')
print rest
return rest
def test_mset(self):
d={
'user3':'Bob',
'user4':'BobX'
}
rest=self.r.mset(d)
print(rest)
return rest
def test_mget(self):
l=['user1','user2']
rest=self.r.mget(l)
print(rest)
return rest
def test_del(self):
rest=self.r.delete('user1')
print (rest)
def main():
str_obj=TestString();
# str_obj.test_set();
str_obj.test_get();
# str_obj.test_mset();
# str_obj.test_mget();
# str_obj.test_del();
if __name__=='__main__':
main() |
5)项目实战
新闻数据,Hash
新闻ID,String
分页数据,List
排序,Sorted Set
|