B站微服务框架Kratos详细教程(7)- 数据库
开始使用由于kratos使用了wire依赖注入框架,开始使用前,建议先了解相关教程:依赖注入wire使用详解配置打开配置文件configs/mysql.toml,修改为自己的服务器配置:[Client]addr = "127.0.0.1:3306"dsn = "{user}:{password}@tcp(127.0.0.1:3306)/{database}?timeout=1s&readT
开始使用
由于kratos
使用了wire
依赖注入框架,开始使用前,建议先了解相关教程:依赖注入wire使用详解
配置
打开配置文件configs/mysql.toml
,修改为自己的服务器配置:
[Client]
addr = "127.0.0.1:3306"
dsn = "{user}:{password}@tcp(127.0.0.1:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"
readDSN = ["{user}:{password}@tcp(127.0.0.2:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"]
active = 20
idle = 10
idleTimeout ="4h"
queryTimeout = "200ms"
execTimeout = "300ms"
tranTimeout = "400ms"
在该配置文件中我们可以配置mysql的读和写的dsn、连接地址addr、连接池的闲置连接数idle、最大连接数active以及各类超时。
如果配置了readDSN,在进行读操作的时候会优先使用readDSN的连接,readDSN可以只配一个地址。
初始化
打开文件internal/dao/db.go
package dao
import (
"context"
"dbserver/internal/model"
"github.com/go-kratos/kratos/pkg/conf/paladin"
"github.com/go-kratos/kratos/pkg/database/sql"
)
func NewDB() (db *sql.DB, cf func(), err error) {
var (
cfg sql.Config
ct paladin.TOML
)
//读取配置文件
if err = paladin.Get("db.toml").Unmarshal(&ct); err != nil {
return
}
if err = ct.Get("Client").UnmarshalTOML(&cfg); err != nil {
return
}
//使用NewMySQL方法进行连接池对象的初始化
db = sql.NewMySQL(&cfg)
cf = func() {db.Close()}
return
}
func (d *dao) RawArticle(ctx context.Context, id int64) (art *model.Article, err error) {
// get data from db
return
}
打开文件internal/dao/dao.go
在该文件中的New
方法接收外部数据库连接池对象db *sql.DB
, 也可以像官方文档说的, 直接在dao
中初始化
这里涉及到依赖注入, 具体可以查看di/wire_gen.go
文件
依赖注入相关可查看这篇文章: 依赖注入wire使用详解
package dao
import (
"context"
"time"
"dbserver/internal/model"
"github.com/go-kratos/kratos/pkg/cache/memcache"
"github.com/go-kratos/kratos/pkg/cache/redis"
"github.com/go-kratos/kratos/pkg/conf/paladin"
"github.com/go-kratos/kratos/pkg/database/sql"
"github.com/go-kratos/kratos/pkg/sync/pipeline/fanout"
xtime "github.com/go-kratos/kratos/pkg/time"
"github.com/google/wire"
)
//声明依赖注入对象
var Provider = wire.NewSet(New, NewDB, NewRedis, NewMC)
//go:generate kratos tool genbts
// Dao dao interface
type Dao interface {
Close()
Ping(ctx context.Context) (err error)
// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
Article(c context.Context, id int64) (*model.Article, error)
}
// dao dao.
type dao struct {
db *sql.DB
redis *redis.Redis
mc *memcache.Memcache
cache *fanout.Fanout
demoExpire int32
}
//使用参数接收连接池对象
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, cf func(), err error) {
return newDao(r, mc, db)
}
//根据参数初始化dao
func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) {
var cfg struct{
DemoExpire xtime.Duration
}
if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
return
}
d = &dao{
db: db, //官方文档直接在这里初始化
redis: r,
mc: mc,
cache: fanout.New("cache"),
demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
}
cf = d.Close
return
}
// Close close the resource.
func (d *dao) Close() {
d.cache.Close()
}
// Ping ping the resource.
func (d *dao) Ping(ctx context.Context) (err error) {
return nil
}
创建测试数据库
create database kratos_demo;
use kratos_demo;
CREATE TABLE `users` (
`uid` int(10) unsigned NOT NULL AUTO_INCREMENT,
`nickname` varchar(100) NOT NULL DEFAULT '' COMMENT '昵称',
`age` smallint(5) unsigned NOT NULL COMMENT '年龄',
`uptime` int(10) unsigned NOT NULL DEFAULT '0',
`addtime` int(10) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
在model/model.go
文件中添加结构体
type User struct {
Uid int32
Nickname string
Age int32
Uptime int32
Addtime int32
}
在dao/dao.go
中新增四个接口
type Dao interface {
Close()
Ping(ctx context.Context) (err error)
// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
Article(c context.Context, id int64) (*model.Article, error)
//新增接口
AddUser(c context.Context, nickname string, age int32) (user *model.User, err error)
UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error)
GetUser(c context.Context, uid int64) (user *model.User, err error)
GetUserList(c context.Context) (userlist []*model.User, err error)
}
新增文件dao/dao.user.go
, 实现四个接口
package dao
import (
"context"
"dbserver/internal/model"
"fmt"
"github.com/go-kratos/kratos/pkg/database/sql"
"github.com/go-kratos/kratos/pkg/log"
"time"
)
//添加用户
func (d *dao)AddUser(c context.Context, nickname string, age int32) (user *model.User, err error){
querySql := fmt.Sprintf("INSERT INTO `users`(uid,nickname,age,uptime,addtime) VALUES(null,?,?,?,?);")
timenow := time.Now().Unix()
res, err := d.db.Exec(c, querySql, nickname, age, timenow, timenow)
if err != nil {
log.Error("db.Exec(%s) error(%v)", querySql, err)
return nil, err
}
user = new(model.User)
user.Uid, _ = res.LastInsertId()
user.Nickname = nickname
user.Age = age
user.Addtime = int32(timenow)
user.Uptime = int32(timenow)
return user, nil
}
//更新用户信息
func (d *dao)UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error){
querySql := fmt.Sprintf("UPDATE `users` SET nickname=?,age=?,uptime=? WHERE uid=?;")
timenow := time.Now().Unix()
res, err := d.db.Exec(c, querySql, nickname, age, timenow, uid)
if err != nil {
log.Error("db.Exec(%s) error(%v)", querySql, err)
return 0, err
}
row, err = res.RowsAffected()
return row, nil
}
//查询用户
func (d *dao)GetUser(c context.Context, uid int64) (user *model.User, err error){
querySql := fmt.Sprintf("SELECT * FROM `users` WHERE uid=?;")
user = new(model.User)
err = d.db.QueryRow(c, querySql, uid).Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime)
if err != nil && err != sql.ErrNoRows {
log.Error("d.QueryRow error(%v)", err)
return
}
return user, nil
}
//查询用户列表
func (d *dao)GetUserList(c context.Context) (userlist []*model.User, err error){
querySql := fmt.Sprintf("SELECT * FROM `users`;")
rows, err := d.db.Query(c, querySql)
if err != nil {
log.Error("query error(%v)", err)
return
}
defer rows.Close()
userlist = make([]*model.User,0)
for rows.Next() {
user := new(model.User)
if err = rows.Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime); err != nil {
log.Error("scan demo log error(%v)", err)
return
}
userlist = append(userlist, user)
}
return userlist, nil
}
打开api/api.proto
, 增加测试http接口:
syntax = "proto3";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
// package 命名使用 {appid}.{version} 的方式, version 形如 v1, v2 ..
package demo.service.v1;
// NOTE: 最后请删除这些无用的注释 (゜-゜)つロ
option go_package = "api";
option (gogoproto.goproto_getters_all) = false;
service Demo {
rpc Ping(.google.protobuf.Empty) returns (.google.protobuf.Empty);
rpc SayHello(HelloReq) returns (.google.protobuf.Empty);
rpc SayHelloURL(HelloReq) returns (HelloResp) {
option (google.api.http) = {
get: "/say_hello"
};
};
rpc AddUser(AddReq) returns (Response) {
option (google.api.http) = {
get: "/adduser"
};
};
rpc UpdateUser(UpdateReq) returns (Response) {
option (google.api.http) = {
get: "/updateuser"
};
};
rpc GetUser(GetReq) returns (Response) {
option (google.api.http) = {
get: "/getuser"
};
};
rpc GetUserList(.google.protobuf.Empty) returns (Response) {
option (google.api.http) = {
get: "/getuserlist"
};
};
}
message HelloReq {
string name = 1 [(gogoproto.moretags) = 'form:"name" validate:"required"'];
}
message HelloResp {
string Content = 1 [(gogoproto.jsontag) = 'content'];
}
message AddReq {
string nickname = 1 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
int32 age = 2 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}
message UpdateReq {
int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
string nickname = 2 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
int32 age = 3 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}
message GetReq {
int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
}
message Response {
string Content = 1 [(gogoproto.jsontag) = 'content'];
}
打开internal/service/service.go
, 增加接口实现:
//添加用户
func (s *Service) AddUser(ctx context.Context, req *pb.AddReq) (reply *pb.Response, err error) {
fmt.Printf("AddUser: %s, %d", req.Nickname, req.Age)
user, err := s.dao.AddUser(ctx, req.Nickname, req.Age)
if err != nil {
fmt.Printf("AddUser %s, %d Error", req.Nickname, req.Age)
return
}
res, _ := json.Marshal(user)
reply = &pb.Response{
Content: string(res),
}
return
}
//更新用户信息
func (s *Service) UpdateUser(ctx context.Context, req *pb.UpdateReq) (reply *pb.Response, err error) {
fmt.Printf("UpdateUser: %s, %d", req.Nickname, req.Age)
rows, err := s.dao.UpdateUser(ctx, req.Uid, req.Nickname, req.Age)
if err != nil {
fmt.Printf("UpdateUser %s, %d Error", req.Nickname, req.Age)
return
}
reply = &pb.Response{
Content: fmt.Sprintf("更新行数: %d",rows),
}
return
}
//获取用户信息
func (s *Service) GetUser(ctx context.Context, req *pb.GetReq) (reply *pb.Response, err error) {
fmt.Printf("GetUser: %d", req.Uid)
user, err := s.dao.GetUser(ctx, req.Uid)
if err != nil {
fmt.Printf("GetUser %s Error", req.Uid)
return
}
res, _ := json.Marshal(user)
reply = &pb.Response{
Content: string(res),
}
return
}
//获取用户列表
func (s *Service) GetUserList(ctx context.Context, req *empty.Empty) (reply *pb.Response, err error) {
fmt.Printf("GetUserList")
userlist, err := s.dao.GetUserList(ctx)
if err != nil {
fmt.Printf("GetUserList Error")
return
}
res, _ := json.Marshal(userlist)
reply = &pb.Response{
Content: string(res),
}
return
}
进入api
目录, 重新生成pb文件
kratos tool protoc
运行项目:
kratos run
打开浏览器:
添加用户:
http://localhost:8000/adduser?nickname=soul&age=22
这里为了方便输出, 我直接返回字符串, 实际项目中应该要在pb文件中定义对应的结构体返回给客户端
返回信息:
{
"code": 0,
"message": "0",
"ttl": 1,
"data": {
"content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
}
}
更新用户信息:
http://localhost:8000/updateuser?uid=3&nickname=soul&age=22
返回信息:
{
"code": 0,
"message": "0",
"ttl": 1,
"data": {
"content": "更新行数: 1"
}
}
获取单个用户信息:
http://localhost:8000/getuser?uid=3
返回信息:
{
"code": 0,
"message": "0",
"ttl": 1,
"data": {
"content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
}
}
获取用户列表:
http://localhost:8000/getuserlist
返回信息:
{
"code": 0,
"message": "0",
"ttl": 1,
"data": {
"content": "[{\"Uid\":1,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102449,\"Addtime\":1608102449},{\"Uid\":2,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102514,\"Addtime\":1608102514},{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}]"
}
}
到此, 已实现基本的数据库操作
事务
kratos/pkg/database/sql
包支持事务操作,具体操作示例如下:
开启一个事务:
tx := d.db.Begin()
if err = tx.Error; err != nil {
log.Error("db begin transcation failed, err=%+v", err)
return
}
在事务中执行语句:
res, err := tx.Exec(_demoSQL, did)
if err != nil {
return
}
rows := res.RowsAffected()
提交事务:
if err = tx.Commit().Error; err!=nil{
log.Error("db commit transcation failed, err=%+v", err)
}
回滚事务:
if err = tx.Rollback().Error; err!=nil{
log.Error("db rollback failed, err=%+v", rollbackErr)
}
事务相关就不写例子啦, 各位可自行尝试.
更多推荐
所有评论(0)