跳转到内容

性能优化

Pie 提供了多种性能优化功能,帮助您构建高性能的数据库应用程序。

// 优化连接池配置
engine, err := pie.NewEngine(ctx, "mydb",
pie.WithMaxPoolSize(100), // 最大连接数
pie.WithMinPoolSize(10), // 最小连接数
pie.WithMaxIdleTime(30*time.Minute), // 最大空闲时间
pie.WithConnectTimeout(10*time.Second), // 连接超时
pie.WithSocketTimeout(30*time.Second), // 套接字超时
)
func monitorConnectionPool(engine *pie.Engine) {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := engine.GetConnectionStats()
log.Printf("Connection Pool Stats:")
log.Printf(" Active: %d", stats.ActiveConnections)
log.Printf(" Idle: %d", stats.IdleConnections)
log.Printf(" Total: %d", stats.TotalConnections)
}
}
}()
}
// 为常用查询创建索引
func createOptimizedIndexes() error {
indexes := pie.MustIndexes(engine)
// 单字段索引
err := indexes.CreateIndex(ctx, "users", bson.D{{"email", 1}})
if err != nil {
return err
}
// 复合索引
err = indexes.CreateIndex(ctx, "users", bson.D{
{"status", 1},
{"created_at", -1},
})
if err != nil {
return err
}
// 部分索引
err = indexes.CreateIndex(ctx, "users", bson.D{{"email", 1}}, &options.IndexOptions{
PartialFilterExpression: bson.D{{"status", "active"}},
})
return err
}
// 只选择需要的字段
func getUsersOptimized() ([]User, error) {
session := pie.Table[User](engine)
users, err := session.
Select("name", "email", "status"). // 只选择需要的字段
Where("status", "active").
Find(ctx)
return users, err
}
// 使用 LIMIT 避免返回过多数据
func getRecentUsers(limit int) ([]User, error) {
session := pie.Table[User](engine)
users, err := session.
Where("status", "active").
OrderByDesc("created_at").
Limit(limit).
Find(ctx)
return users, err
}
// 配置多级缓存
func setupMultiLevelCache() error {
ristrettoCache, err := pie.NewRistrettoCache(nil)
if err != nil {
return err
}
redisCache, err := pie.NewRedisCache(&pie.RedisCacheConfig{
Addr: "localhost:6379",
})
if err != nil {
return err
}
engine.UseCache(ristrettoCache, redisCache)
return nil
}
func warmupCache(ctx context.Context, engine *pie.Engine) {
session := pie.Table[User](engine)
queries := []struct {
name string
query func() *pie.Session[User]
}{
{
name: "active_users",
query: func() *pie.Session[User] {
return session.Where("status", "active").Cache(1 * time.Hour)
},
},
{
name: "admin_users",
query: func() *pie.Session[User] {
return session.Where("role", "admin").Cache(1 * time.Hour)
},
},
}
for _, q := range queries {
users, err := q.query().Cache(q.name).Find(ctx)
if err != nil {
log.Printf("Failed to warmup cache for %s: %v", q.name, err)
} else {
log.Printf("Warmed up cache for %s: %d users", q.name, len(users))
}
}
}
// 根据数据更新频率设置不同的缓存时间
const (
UserCacheTTL = 10 * time.Minute // 用户信息,更新频率中等
ConfigCacheTTL = 1 * time.Hour // 配置信息,更新频率低
StatsCacheTTL = 30 * time.Minute // 统计数据,更新频率中等
SessionCacheTTL = 5 * time.Minute // 会话信息,更新频率高
)
func getCachedUsers(ctx context.Context, dataType string) ([]User, error) {
session := pie.Table[User](engine)
ttl := 5 * time.Minute
switch dataType {
case "user":
ttl = UserCacheTTL
case "config":
ttl = ConfigCacheTTL
case "stats":
ttl = StatsCacheTTL
case "session":
ttl = SessionCacheTTL
}
users, err := session.WithCache(ttl).Cache(dataType).Find(ctx)
return any(users), err
}
func batchInsertUsers(users []*User) error {
session := pie.Table[User](engine)
// 分批插入,避免单次操作过大
batchSize := 1000
for i := 0; i < len(users); i += batchSize {
end := i + batchSize
if end > len(users) {
end = len(users)
}
batch := users[i:end]
_, err := session.InsertMany(ctx, batch)
if err != nil {
return fmt.Errorf("failed to insert batch %d-%d: %w", i, end-1, err)
}
}
return nil
}
func batchUpdateUsers(updates []UserUpdate) error {
bulkWrite := pie.NewBulkWrite[User](engine)
for _, update := range updates {
bulkWrite.UpdateOne(
bson.D{{"_id", update.ID}},
bson.D{{"$set", update.Data}},
)
}
result, err := bulkWrite.ExecuteOrdered(ctx)
if err != nil {
return err
}
log.Printf("Updated %d users", result.ModifiedCount)
return nil
}
func optimizedAggregation() error {
aggregate := pie.NewAggregate[User](engine)
// 使用 $match 阶段减少数据量
result, err := aggregate.
MatchStage().
Where("status", "active").
Where("created_at", pie.Gte("created_at", time.Now().AddDate(0, -1, 0))).
GroupStage().
By("role", "$role").
Count("total").
Avg("avgAge", "$age").
Done().
SortStage().Desc("total").
LimitStage(10). // 限制结果数量
Exec(ctx)
if err != nil {
return err
}
log.Printf("Aggregation result: %+v", result.Data)
return nil
}
func createAggregationIndexes() error {
indexes := pie.MustIndexes(engine)
// 为聚合查询创建复合索引
err := indexes.CreateIndex(ctx, "users", bson.D{
{"status", 1},
{"created_at", -1},
{"role", 1},
})
if err != nil {
return err
}
return nil
}
func processLargeDataset() error {
session := pie.Table[User](engine)
cursor, err := session.
Where("status", "active").
OrderBy("created_at").
FindCursor(ctx)
if err != nil {
return err
}
defer cursor.Close(ctx)
// 使用游标逐条处理,避免内存溢出
for cursor.Next(ctx) {
var user User
if err := cursor.Decode(&user); err != nil {
continue
}
// 处理单个用户
processUser(&user)
}
return nil
}
func processDataInPages(pageSize int) error {
session := pie.Table[User](engine)
page := 1
for {
result, err := session.
Where("status", "active").
OrderBy("created_at").
Paginate(ctx, pie.PaginateParams{
Page: page,
PageSize: pageSize,
})
if err != nil {
return err
}
if len(result.Data) == 0 {
break // 没有更多数据
}
// 处理当前页数据
for _, user := range result.Data {
processUser(&user)
}
page++
// 强制垃圾回收
runtime.GC()
}
return nil
}
func monitorPerformance() {
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := engine.GetPerformanceStats()
log.Printf("Performance Stats:")
log.Printf(" Queries/sec: %.2f", stats.QueriesPerSecond)
log.Printf(" Avg Query Time: %v", stats.AvgQueryTime)
log.Printf(" Cache Hit Rate: %.2f%%", stats.CacheHitRate)
}
}
}()
}
func analyzeQueryPerformance() error {
session := pie.Table[User](engine)
// 启用查询分析
session.EnableQueryAnalysis()
start := time.Now()
users, err := session.
Where("status", "active").
Where("age", pie.Gte("age", 18)).
OrderBy("created_at").
Find(ctx)
duration := time.Since(start)
if err != nil {
return err
}
log.Printf("Query executed in %v", duration)
log.Printf("Returned %d users", len(users))
// 分析查询计划
plan := session.GetQueryPlan()
log.Printf("Query plan: %+v", plan)
return nil
}
// 好的查询实践
func goodQueryPractices() error {
session := pie.Table[User](engine)
// 使用索引字段进行查询
users, err := session.
Where("email", "test@example.com"). // email 有索引
Find(ctx)
if err != nil {
return err
}
// 使用投影减少数据传输
userNames, err := session.
Select("name"). // 只选择需要的字段
Where("status", "active").
Find(ctx)
return err
}
// 避免的查询实践
func badQueryPractices() error {
session := pie.Table[User](engine)
// 避免全表扫描
users, err := session.Find(ctx) // 没有 WHERE 条件
if err != nil {
return err
}
// 避免选择所有字段
allUsers, err = session.Find(ctx) // 没有 Select 限制
return err
}
// 合理的缓存策略
func reasonableCachingStrategy() error {
session := pie.Table[User](engine)
// 静态数据长时间缓存
configs, err := session.
WithCache(1 * time.Hour).
Cache("configs").
Find(ctx)
if err != nil {
return err
}
// 动态数据短时间缓存
recentUsers, err := session.
WithCache(5 * time.Minute).
Cache("recent_users").
WhereRecentDays("created_at", 1).
Find(ctx)
return err
}
// 性能相关的错误处理
func handlePerformanceErrors(err error) {
if pie.IsTimeoutError(err) {
log.Println("Query timeout, consider optimizing query or increasing timeout")
}
if pie.IsConnectionError(err) {
log.Println("Connection error, check connection pool settings")
}
if mongoErr, ok := err.(pie.MongoError); ok {
switch mongoErr.Code {
case 12500: // LockTimeout
log.Println("Lock timeout, consider reducing concurrent operations")
case 11600: // Interrupted
log.Println("Operation interrupted, check system resources")
}
}
}
func BenchmarkUserQuery(b *testing.B) {
engine, err := createTestEngine()
require.NoError(b, err)
defer engine.Disconnect(context.Background())
session := pie.Table[User](engine)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var users []User
err := session.Where("status", "active").Find(context.Background(), &users)
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkUserQueryWithCache(b *testing.B) {
engine, err := createTestEngine()
require.NoError(b, err)
defer engine.Disconnect(context.Background())
session := pie.Table[User](engine).WithCache(5 * time.Minute)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var users []User
err := session.Cache("active_users").Where("status", "active").Find(context.Background(), &users)
if err != nil {
b.Fatal(err)
}
}
}
func TestConcurrentQueries(t *testing.T) {
engine, err := createTestEngine()
require.NoError(t, err)
defer engine.Disconnect(context.Background())
session := pie.Table[User](engine)
// 并发查询测试
concurrency := 100
queries := 1000
var wg sync.WaitGroup
errors := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < queries/concurrency; j++ {
var users []User
err := session.Where("status", "active").Find(context.Background(), &users)
if err != nil {
errors <- err
return
}
}
}()
}
wg.Wait()
close(errors)
// 检查错误
for err := range errors {
t.Errorf("Query error: %v", err)
}
}