性能优化
Pie 提供了多种性能优化功能,帮助您构建高性能的数据库应用程序。
// 优化连接池配置engine, err := pie.NewEngine(ctx, "mydb", pie.WithMaxPoolSize(100), // 最大连接数 pie.WithMinPoolSize(10), // 最小连接数 pie.WithMaxIdleTime(30*time.Minute), // 最大空闲时间 pie.WithConnectTimeout(10*time.Second), // 连接超时 pie.WithSocketTimeout(30*time.Second), // 套接字超时)func monitorConnectionPool(engine *pie.Engine) { go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop()
for { select { case <-ticker.C: stats := engine.GetConnectionStats() log.Printf("Connection Pool Stats:") log.Printf(" Active: %d", stats.ActiveConnections) log.Printf(" Idle: %d", stats.IdleConnections) log.Printf(" Total: %d", stats.TotalConnections) } } }()}// 为常用查询创建索引func createOptimizedIndexes() error { indexes := pie.MustIndexes(engine)
// 单字段索引 err := indexes.CreateIndex(ctx, "users", bson.D{{"email", 1}}) if err != nil { return err }
// 复合索引 err = indexes.CreateIndex(ctx, "users", bson.D{ {"status", 1}, {"created_at", -1}, }) if err != nil { return err }
// 部分索引 err = indexes.CreateIndex(ctx, "users", bson.D{{"email", 1}}, &options.IndexOptions{ PartialFilterExpression: bson.D{{"status", "active"}}, })
return err}// 只选择需要的字段func getUsersOptimized() ([]User, error) { session := pie.Table[User](engine)
users, err := session. Select("name", "email", "status"). // 只选择需要的字段 Where("status", "active"). Find(ctx)
return users, err}// 使用 LIMIT 避免返回过多数据func getRecentUsers(limit int) ([]User, error) { session := pie.Table[User](engine)
users, err := session. Where("status", "active"). OrderByDesc("created_at"). Limit(limit). Find(ctx)
return users, err}// 配置多级缓存func setupMultiLevelCache() error { ristrettoCache, err := pie.NewRistrettoCache(nil) if err != nil { return err }
redisCache, err := pie.NewRedisCache(&pie.RedisCacheConfig{ Addr: "localhost:6379", }) if err != nil { return err }
engine.UseCache(ristrettoCache, redisCache) return nil}func warmupCache(ctx context.Context, engine *pie.Engine) { session := pie.Table[User](engine)
queries := []struct { name string query func() *pie.Session[User] }{ { name: "active_users", query: func() *pie.Session[User] { return session.Where("status", "active").Cache(1 * time.Hour) }, }, { name: "admin_users", query: func() *pie.Session[User] { return session.Where("role", "admin").Cache(1 * time.Hour) }, }, }
for _, q := range queries { users, err := q.query().Cache(q.name).Find(ctx) if err != nil { log.Printf("Failed to warmup cache for %s: %v", q.name, err) } else { log.Printf("Warmed up cache for %s: %d users", q.name, len(users)) } }}// 根据数据更新频率设置不同的缓存时间const ( UserCacheTTL = 10 * time.Minute // 用户信息,更新频率中等 ConfigCacheTTL = 1 * time.Hour // 配置信息,更新频率低 StatsCacheTTL = 30 * time.Minute // 统计数据,更新频率中等 SessionCacheTTL = 5 * time.Minute // 会话信息,更新频率高)
func getCachedUsers(ctx context.Context, dataType string) ([]User, error) { session := pie.Table[User](engine)
ttl := 5 * time.Minute switch dataType { case "user": ttl = UserCacheTTL case "config": ttl = ConfigCacheTTL case "stats": ttl = StatsCacheTTL case "session": ttl = SessionCacheTTL }
users, err := session.WithCache(ttl).Cache(dataType).Find(ctx) return any(users), err
}批量操作优化
Section titled “批量操作优化”func batchInsertUsers(users []*User) error { session := pie.Table[User](engine)
// 分批插入,避免单次操作过大 batchSize := 1000 for i := 0; i < len(users); i += batchSize { end := i + batchSize if end > len(users) { end = len(users) }
batch := users[i:end] _, err := session.InsertMany(ctx, batch) if err != nil { return fmt.Errorf("failed to insert batch %d-%d: %w", i, end-1, err) } }
return nil}func batchUpdateUsers(updates []UserUpdate) error { bulkWrite := pie.NewBulkWrite[User](engine)
for _, update := range updates { bulkWrite.UpdateOne( bson.D{{"_id", update.ID}}, bson.D{{"$set", update.Data}}, ) }
result, err := bulkWrite.ExecuteOrdered(ctx) if err != nil { return err }
log.Printf("Updated %d users", result.ModifiedCount) return nil}聚合管道优化
Section titled “聚合管道优化”func optimizedAggregation() error { aggregate := pie.NewAggregate[User](engine)
// 使用 $match 阶段减少数据量 result, err := aggregate. MatchStage(). Where("status", "active"). Where("created_at", pie.Gte("created_at", time.Now().AddDate(0, -1, 0))). GroupStage(). By("role", "$role"). Count("total"). Avg("avgAge", "$age"). Done(). SortStage().Desc("total"). LimitStage(10). // 限制结果数量 Exec(ctx)
if err != nil { return err }
log.Printf("Aggregation result: %+v", result.Data) return nil}func createAggregationIndexes() error { indexes := pie.MustIndexes(engine)
// 为聚合查询创建复合索引 err := indexes.CreateIndex(ctx, "users", bson.D{ {"status", 1}, {"created_at", -1}, {"role", 1}, }) if err != nil { return err }
return nil}func processLargeDataset() error { session := pie.Table[User](engine)
cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(ctx)
if err != nil { return err } defer cursor.Close(ctx)
// 使用游标逐条处理,避免内存溢出 for cursor.Next(ctx) { var user User if err := cursor.Decode(&user); err != nil { continue }
// 处理单个用户 processUser(&user) }
return nil}func processDataInPages(pageSize int) error { session := pie.Table[User](engine)
page := 1 for { result, err := session. Where("status", "active"). OrderBy("created_at"). Paginate(ctx, pie.PaginateParams{ Page: page, PageSize: pageSize, })
if err != nil { return err }
if len(result.Data) == 0 { break // 没有更多数据 }
// 处理当前页数据 for _, user := range result.Data { processUser(&user) }
page++
// 强制垃圾回收 runtime.GC() }
return nil}func monitorPerformance() { go func() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop()
for { select { case <-ticker.C: stats := engine.GetPerformanceStats() log.Printf("Performance Stats:") log.Printf(" Queries/sec: %.2f", stats.QueriesPerSecond) log.Printf(" Avg Query Time: %v", stats.AvgQueryTime) log.Printf(" Cache Hit Rate: %.2f%%", stats.CacheHitRate) } } }()}func analyzeQueryPerformance() error { session := pie.Table[User](engine)
// 启用查询分析 session.EnableQueryAnalysis()
start := time.Now()
users, err := session. Where("status", "active"). Where("age", pie.Gte("age", 18)). OrderBy("created_at"). Find(ctx)
duration := time.Since(start)
if err != nil { return err }
log.Printf("Query executed in %v", duration) log.Printf("Returned %d users", len(users))
// 分析查询计划 plan := session.GetQueryPlan() log.Printf("Query plan: %+v", plan)
return nil}1. 查询优化原则
Section titled “1. 查询优化原则”// 好的查询实践func goodQueryPractices() error { session := pie.Table[User](engine)
// 使用索引字段进行查询 users, err := session. Where("email", "test@example.com"). // email 有索引 Find(ctx)
if err != nil { return err }
// 使用投影减少数据传输 userNames, err := session. Select("name"). // 只选择需要的字段 Where("status", "active"). Find(ctx)
return err}
// 避免的查询实践func badQueryPractices() error { session := pie.Table[User](engine)
// 避免全表扫描 users, err := session.Find(ctx) // 没有 WHERE 条件
if err != nil { return err }
// 避免选择所有字段 allUsers, err = session.Find(ctx) // 没有 Select 限制
return err}2. 缓存策略
Section titled “2. 缓存策略”// 合理的缓存策略func reasonableCachingStrategy() error { session := pie.Table[User](engine)
// 静态数据长时间缓存 configs, err := session. WithCache(1 * time.Hour). Cache("configs"). Find(ctx)
if err != nil { return err }
// 动态数据短时间缓存 recentUsers, err := session. WithCache(5 * time.Minute). Cache("recent_users"). WhereRecentDays("created_at", 1). Find(ctx)
return err}3. 错误处理
Section titled “3. 错误处理”// 性能相关的错误处理func handlePerformanceErrors(err error) { if pie.IsTimeoutError(err) { log.Println("Query timeout, consider optimizing query or increasing timeout") }
if pie.IsConnectionError(err) { log.Println("Connection error, check connection pool settings") }
if mongoErr, ok := err.(pie.MongoError); ok { switch mongoErr.Code { case 12500: // LockTimeout log.Println("Lock timeout, consider reducing concurrent operations") case 11600: // Interrupted log.Println("Operation interrupted, check system resources") } }}func BenchmarkUserQuery(b *testing.B) { engine, err := createTestEngine() require.NoError(b, err) defer engine.Disconnect(context.Background())
session := pie.Table[User](engine)
b.ResetTimer() for i := 0; i < b.N; i++ { var users []User err := session.Where("status", "active").Find(context.Background(), &users) if err != nil { b.Fatal(err) } }}
func BenchmarkUserQueryWithCache(b *testing.B) { engine, err := createTestEngine() require.NoError(b, err) defer engine.Disconnect(context.Background())
session := pie.Table[User](engine).WithCache(5 * time.Minute)
b.ResetTimer() for i := 0; i < b.N; i++ { var users []User err := session.Cache("active_users").Where("status", "active").Find(context.Background(), &users) if err != nil { b.Fatal(err) } }}func TestConcurrentQueries(t *testing.T) { engine, err := createTestEngine() require.NoError(t, err) defer engine.Disconnect(context.Background())
session := pie.Table[User](engine)
// 并发查询测试 concurrency := 100 queries := 1000
var wg sync.WaitGroup errors := make(chan error, concurrency)
for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done()
for j := 0; j < queries/concurrency; j++ { var users []User err := session.Where("status", "active").Find(context.Background(), &users) if err != nil { errors <- err return } } }() }
wg.Wait() close(errors)
// 检查错误 for err := range errors { t.Errorf("Query error: %v", err) }}