开始使用

由于kratos使用了wire依赖注入框架,开始使用前,建议先了解相关教程:依赖注入wire使用详解

配置

打开配置文件configs/mysql.toml,修改为自己的服务器配置:

[Client]
	addr = "127.0.0.1:3306"
	dsn = "{user}:{password}@tcp(127.0.0.1:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"
	readDSN = ["{user}:{password}@tcp(127.0.0.2:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"]
	active = 20
	idle = 10
	idleTimeout ="4h"
	queryTimeout = "200ms"
	execTimeout = "300ms"
	tranTimeout = "400ms"

在该配置文件中我们可以配置mysql的读和写的dsn、连接地址addr、连接池的闲置连接数idle、最大连接数active以及各类超时。

如果配置了readDSN,在进行读操作的时候会优先使用readDSN的连接,readDSN可以只配一个地址。

初始化

打开文件internal/dao/db.go

package dao

import (
	"context"

	"dbserver/internal/model"
	"github.com/go-kratos/kratos/pkg/conf/paladin"
	"github.com/go-kratos/kratos/pkg/database/sql"
)

func NewDB() (db *sql.DB, cf func(), err error) {
	var (
		cfg sql.Config
		ct paladin.TOML
	)
	//读取配置文件
	if err = paladin.Get("db.toml").Unmarshal(&ct); err != nil {
		return
	}
	if err = ct.Get("Client").UnmarshalTOML(&cfg); err != nil {
		return
	}
	//使用NewMySQL方法进行连接池对象的初始化
	db = sql.NewMySQL(&cfg)
	cf = func() {db.Close()}
	return
}

func (d *dao) RawArticle(ctx context.Context, id int64) (art *model.Article, err error) {
	// get data from db
	return
}

打开文件internal/dao/dao.go
在该文件中的New方法接收外部数据库连接池对象db *sql.DB, 也可以像官方文档说的, 直接在dao中初始化
这里涉及到依赖注入, 具体可以查看di/wire_gen.go文件
依赖注入相关可查看这篇文章: 依赖注入wire使用详解

package dao

import (
	"context"
	"time"

	"dbserver/internal/model"
	"github.com/go-kratos/kratos/pkg/cache/memcache"
	"github.com/go-kratos/kratos/pkg/cache/redis"
	"github.com/go-kratos/kratos/pkg/conf/paladin"
	"github.com/go-kratos/kratos/pkg/database/sql"
	"github.com/go-kratos/kratos/pkg/sync/pipeline/fanout"
	xtime "github.com/go-kratos/kratos/pkg/time"

	"github.com/google/wire"
)
//声明依赖注入对象
var Provider = wire.NewSet(New, NewDB, NewRedis, NewMC)

//go:generate kratos tool genbts
// Dao dao interface
type Dao interface {
	Close()
	Ping(ctx context.Context) (err error)
	// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
	Article(c context.Context, id int64) (*model.Article, error)
}

// dao dao.
type dao struct {
	db          *sql.DB
	redis       *redis.Redis
	mc          *memcache.Memcache
	cache *fanout.Fanout
	demoExpire int32
}

//使用参数接收连接池对象
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, cf func(), err error) {
	return newDao(r, mc, db)
}

//根据参数初始化dao
func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) {
	var cfg struct{
		DemoExpire xtime.Duration
	}
	if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
		return
	}
	d = &dao{
		db: db, //官方文档直接在这里初始化
		redis: r,
		mc: mc,
		cache: fanout.New("cache"),
		demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
	}
	cf = d.Close
	return
}

// Close close the resource.
func (d *dao) Close() {
	d.cache.Close()
}

// Ping ping the resource.
func (d *dao) Ping(ctx context.Context) (err error) {
	return nil
}

创建测试数据库

create database kratos_demo;
use kratos_demo;

CREATE TABLE `users` (
  `uid` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `nickname` varchar(100) NOT NULL DEFAULT '' COMMENT '昵称',
  `age` smallint(5) unsigned NOT NULL COMMENT '年龄',
  `uptime` int(10) unsigned NOT NULL DEFAULT '0',
  `addtime` int(10) unsigned NOT NULL DEFAULT '0',
  PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

model/model.go文件中添加结构体

type User struct {
	Uid int32
	Nickname string
	Age int32
	Uptime int32
	Addtime int32
}

dao/dao.go中新增四个接口

type Dao interface {
	Close()
	Ping(ctx context.Context) (err error)
	// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
	Article(c context.Context, id int64) (*model.Article, error)
	//新增接口
	AddUser(c context.Context, nickname string, age int32) (user *model.User, err error)
	UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error)
	GetUser(c context.Context, uid int64) (user *model.User, err error)
	GetUserList(c context.Context) (userlist []*model.User, err error)
}

新增文件dao/dao.user.go, 实现四个接口

package dao

import (
	"context"
	"dbserver/internal/model"
	"fmt"
	"github.com/go-kratos/kratos/pkg/database/sql"
	"github.com/go-kratos/kratos/pkg/log"
	"time"
)
//添加用户
func (d *dao)AddUser(c context.Context, nickname string, age int32) (user *model.User, err error){
	querySql := fmt.Sprintf("INSERT INTO `users`(uid,nickname,age,uptime,addtime) VALUES(null,?,?,?,?);")

	timenow := time.Now().Unix()
	res, err := d.db.Exec(c, querySql, nickname, age, timenow, timenow)
	if err != nil {
		log.Error("db.Exec(%s) error(%v)", querySql, err)
		return nil, err
	}
	user = new(model.User)
	user.Uid, _ = res.LastInsertId()
	user.Nickname = nickname
	user.Age = age
	user.Addtime = int32(timenow)
	user.Uptime = int32(timenow)

	return user, nil
}
//更新用户信息
func (d *dao)UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error){
	querySql := fmt.Sprintf("UPDATE `users` SET nickname=?,age=?,uptime=? WHERE uid=?;")

	timenow := time.Now().Unix()
	res, err := d.db.Exec(c, querySql, nickname, age, timenow, uid)
	if err != nil {
		log.Error("db.Exec(%s) error(%v)", querySql, err)
		return 0, err
	}

	row, err = res.RowsAffected()
	return row, nil
}
//查询用户
func (d *dao)GetUser(c context.Context, uid int64) (user *model.User, err error){
	querySql := fmt.Sprintf("SELECT * FROM `users` WHERE uid=?;")

	user = new(model.User)
	err = d.db.QueryRow(c, querySql, uid).Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime)
	if err != nil && err != sql.ErrNoRows {
		log.Error("d.QueryRow error(%v)", err)
		return
	}
	return user, nil
}
//查询用户列表
func (d *dao)GetUserList(c context.Context) (userlist []*model.User, err error){
	querySql := fmt.Sprintf("SELECT * FROM `users`;")
	rows, err := d.db.Query(c, querySql)
	if err != nil {
		log.Error("query  error(%v)", err)
		return
	}
	defer rows.Close()

	userlist = make([]*model.User,0)
	for rows.Next() {
		user := new(model.User)

		if err = rows.Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime); err != nil {
			log.Error("scan demo log error(%v)", err)
			return
		}
		userlist = append(userlist, user)
	}
	return userlist, nil
}

打开api/api.proto, 增加测试http接口:

syntax = "proto3";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";

// package 命名使用 {appid}.{version} 的方式, version 形如 v1, v2 ..
package demo.service.v1;

// NOTE: 最后请删除这些无用的注释 (゜-゜)つロ

option go_package = "api";
option (gogoproto.goproto_getters_all) = false;

service Demo {
  rpc Ping(.google.protobuf.Empty) returns (.google.protobuf.Empty);
  rpc SayHello(HelloReq) returns (.google.protobuf.Empty);
  rpc SayHelloURL(HelloReq) returns (HelloResp) {
    option (google.api.http) = {
      get: "/say_hello"
    };
  };
  rpc AddUser(AddReq) returns (Response) {
    option (google.api.http) = {
      get: "/adduser"
    };
  };
  rpc UpdateUser(UpdateReq) returns (Response) {
    option (google.api.http) = {
      get: "/updateuser"
    };
  };
  rpc GetUser(GetReq) returns (Response) {
    option (google.api.http) = {
      get: "/getuser"
    };
  };
  rpc GetUserList(.google.protobuf.Empty) returns (Response) {
    option (google.api.http) = {
      get: "/getuserlist"
    };
  };
}

message HelloReq {
  string name = 1 [(gogoproto.moretags) = 'form:"name" validate:"required"'];
}

message HelloResp {
  string Content = 1 [(gogoproto.jsontag) = 'content'];
}

message AddReq {
  string nickname = 1 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
  int32 age = 2 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}

message UpdateReq {
  int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
  string nickname = 2 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
  int32 age = 3 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}

message GetReq {
  int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
}

message Response {
  string Content = 1 [(gogoproto.jsontag) = 'content'];
}

打开internal/service/service.go, 增加接口实现:


//添加用户
func (s *Service) AddUser(ctx context.Context, req *pb.AddReq) (reply *pb.Response, err error) {
	fmt.Printf("AddUser: %s, %d", req.Nickname, req.Age)
	user, err := s.dao.AddUser(ctx, req.Nickname, req.Age)
	if err != nil {
		fmt.Printf("AddUser %s, %d Error", req.Nickname, req.Age)
		return
	}
	res, _ := json.Marshal(user)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

//更新用户信息
func (s *Service) UpdateUser(ctx context.Context, req *pb.UpdateReq) (reply *pb.Response, err error) {
	fmt.Printf("UpdateUser: %s, %d", req.Nickname, req.Age)
	rows, err := s.dao.UpdateUser(ctx, req.Uid, req.Nickname, req.Age)
	if err != nil {
		fmt.Printf("UpdateUser %s, %d Error", req.Nickname, req.Age)
		return
	}
	reply = &pb.Response{
		Content: fmt.Sprintf("更新行数: %d",rows),
	}
	return
}

//获取用户信息
func (s *Service) GetUser(ctx context.Context, req *pb.GetReq) (reply *pb.Response, err error) {
	fmt.Printf("GetUser: %d", req.Uid)
	user, err := s.dao.GetUser(ctx, req.Uid)
	if err != nil {
		fmt.Printf("GetUser %s Error", req.Uid)
		return
	}
	res, _ := json.Marshal(user)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

//获取用户列表
func (s *Service) GetUserList(ctx context.Context, req *empty.Empty) (reply *pb.Response, err error) {
	fmt.Printf("GetUserList")
	userlist, err := s.dao.GetUserList(ctx)
	if err != nil {
		fmt.Printf("GetUserList Error")
		return
	}
	res, _ := json.Marshal(userlist)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

进入api目录, 重新生成pb文件

kratos tool protoc

运行项目:

kratos run

打开浏览器:

添加用户:

http://localhost:8000/adduser?nickname=soul&age=22

这里为了方便输出, 我直接返回字符串, 实际项目中应该要在pb文件中定义对应的结构体返回给客户端

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
    }
}

更新用户信息:

http://localhost:8000/updateuser?uid=3&nickname=soul&age=22

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "更新行数: 1"
    }
}

获取单个用户信息:

http://localhost:8000/getuser?uid=3

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
    }
}

获取用户列表:

http://localhost:8000/getuserlist

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "[{\"Uid\":1,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102449,\"Addtime\":1608102449},{\"Uid\":2,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102514,\"Addtime\":1608102514},{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}]"
    }
}

到此, 已实现基本的数据库操作

事务

kratos/pkg/database/sql包支持事务操作,具体操作示例如下:

开启一个事务:

tx := d.db.Begin()
if err = tx.Error; err != nil {
    log.Error("db begin transcation failed, err=%+v", err)
    return
}

在事务中执行语句:

res, err := tx.Exec(_demoSQL, did)
if err != nil {
    return
}
rows := res.RowsAffected()

提交事务:

if err = tx.Commit().Error; err!=nil{
    log.Error("db commit transcation failed, err=%+v", err)
}

回滚事务:

if err = tx.Rollback().Error; err!=nil{
    log.Error("db rollback failed, err=%+v", rollbackErr)
}

事务相关就不写例子啦, 各位可自行尝试.

本项目示例源码: https://download.csdn.net/download/uisoul/13704134

Logo

更多推荐