游标操作
Pie 提供了灵活的游标操作,支持多种数据遍历方式,特别适合处理大量数据或需要流式处理的场景。
// 创建游标cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(ctx)
if err != nil { log.Fatal("Failed to create cursor:", err)}defer cursor.Close(ctx)方式1: 使用 Next() 和 Decode()
Section titled “方式1: 使用 Next() 和 Decode()”// 逐条处理数据for cursor.Next(ctx) { var user User if err := cursor.Decode(&user); err != nil { log.Printf("Failed to decode user: %v", err) continue }
// 处理用户数据 fmt.Printf("User: %s, Email: %s\n", user.Name, user.Email)
// 可以在这里添加业务逻辑 processUser(&user)}方式2: 使用 All() 一次性获取
Section titled “方式2: 使用 All() 一次性获取”// 一次性获取所有数据var users []Usererr := cursor.All(ctx, &users)if err != nil { log.Fatal("Failed to get all users:", err)}
// 处理所有用户for _, user := range users { fmt.Printf("User: %s, Email: %s\n", user.Name, user.Email) processUser(&user)}方式3: 使用 Iterate() 迭代处理
Section titled “方式3: 使用 Iterate() 迭代处理”// 使用迭代函数处理err := cursor.Iterate(ctx, func(user *User) error { // 处理单个用户 fmt.Printf("User: %s, Email: %s\n", user.Name, user.Email)
// 可以返回错误来中断迭代 if user.Name == "特殊用户" { return errors.New("遇到特殊用户,停止处理") }
return nil})
if err != nil { log.Printf("Iteration error: %v", err)}方式4: 使用 Take() 获取前N个
Section titled “方式4: 使用 Take() 获取前N个”// 获取前5个用户topUsers, err := cursor.Take(ctx, 5)if err != nil { log.Fatal("Failed to take users:", err)}
fmt.Printf("获取到 %d 个用户\n", len(topUsers))for _, user := range topUsers { fmt.Printf("User: %s\n", user.Name)}方式5: 使用 First() 获取第一个
Section titled “方式5: 使用 First() 获取第一个”// 获取第一个用户firstUser, err := cursor.First(ctx)if err != nil { if pie.IsNotFoundError(err) { fmt.Println("没有找到用户") } else { log.Fatal("Failed to get first user:", err) }} else { fmt.Printf("第一个用户: %s\n", firstUser.Name)}const batchSize = 100
cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(ctx)
if err != nil { log.Fatal("Failed to create cursor:", err)}defer cursor.Close(ctx)
var batch []Userfor cursor.Next(ctx) { var user User if err := cursor.Decode(&user); err != nil { continue }
batch = append(batch, user)
// 当批次满了或游标结束时处理 if len(batch) >= batchSize { processBatch(batch) batch = batch[:0] // 清空批次 }}
// 处理剩余的记录if len(batch) > 0 { processBatch(batch)}
func processBatch(users []User) { fmt.Printf("处理批次,包含 %d 个用户\n", len(users))
// 批量处理逻辑 for _, user := range users { // 处理用户 processUser(&user) }}cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(ctx)
if err != nil { log.Fatal("Failed to create cursor:", err)}defer cursor.Close(ctx)
processedCount := 0maxProcess := 1000
for cursor.Next(ctx) && processedCount < maxProcess { var user User if err := cursor.Decode(&user); err != nil { continue }
// 处理用户 processUser(&user) processedCount++
// 每处理100个用户输出进度 if processedCount%100 == 0 { fmt.Printf("已处理 %d 个用户\n", processedCount) }}
fmt.Printf("总共处理了 %d 个用户\n", processedCount)错误处理和重试
Section titled “错误处理和重试”func processUsersWithRetry(session *pie.Session[User], maxRetries int) error { cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(context.Background())
if err != nil { return fmt.Errorf("failed to create cursor: %w", err) } defer cursor.Close(context.Background())
for cursor.Next(context.Background()) { var user User if err := cursor.Decode(&user); err != nil { log.Printf("Failed to decode user: %v", err) continue }
// 带重试的处理 if err := processUserWithRetry(&user, maxRetries); err != nil { log.Printf("Failed to process user %s after %d retries: %v", user.Name, maxRetries, err) continue } }
return nil}
func processUserWithRetry(user *User, maxRetries int) error { for i := 0; i < maxRetries; i++ { if err := processUser(user); err != nil { if i == maxRetries-1 { return err }
// 等待一段时间后重试 time.Sleep(time.Duration(i+1) * time.Second) continue } return nil } return errors.New("max retries exceeded")}实际应用场景
Section titled “实际应用场景”func migrateUsers() error { // 源数据库游标 sourceCursor, err := sourceSession. Where("status", "active"). OrderBy("created_at"). FindCursor(context.Background())
if err != nil { return fmt.Errorf("failed to create source cursor: %w", err) } defer sourceCursor.Close(context.Background())
// 目标数据库会话 targetSession := pie.Table[User](targetEngine)
batchSize := 100 var batch []User processedCount := 0
for sourceCursor.Next(context.Background()) { var user User if err := sourceCursor.Decode(&user); err != nil { log.Printf("Failed to decode user: %v", err) continue }
// 转换数据 migratedUser := migrateUserData(user) batch = append(batch, migratedUser)
// 批量插入 if len(batch) >= batchSize { if err := insertBatch(targetSession, batch); err != nil { return fmt.Errorf("failed to insert batch: %w", err) } processedCount += len(batch) batch = batch[:0] log.Printf("已迁移 %d 个用户", processedCount) } }
// 处理剩余数据 if len(batch) > 0 { if err := insertBatch(targetSession, batch); err != nil { return fmt.Errorf("failed to insert final batch: %w", err) } processedCount += len(batch) }
log.Printf("迁移完成,总共迁移了 %d 个用户", processedCount) return nil}
func insertBatch(session *pie.Session[User], users []User) error { bulkWrite := pie.NewBulkWrite[User](session.Engine()) for _, user := range users { bulkWrite.InsertOne(&user) } _, err := bulkWrite.ExecuteOrdered(context.Background()) return err}func exportUsersToCSV(w io.Writer) error { cursor, err := session. Where("status", "active"). OrderBy("created_at"). FindCursor(context.Background())
if err != nil { return fmt.Errorf("failed to create cursor: %w", err) } defer cursor.Close(context.Background())
// 写入CSV头部 csvWriter := csv.NewWriter(w) defer csvWriter.Flush()
if err := csvWriter.Write([]string{"ID", "Name", "Email", "Age", "CreatedAt"}); err != nil { return err }
// 逐行写入数据 for cursor.Next(context.Background()) { var user User if err := cursor.Decode(&user); err != nil { log.Printf("Failed to decode user: %v", err) continue }
record := []string{ user.ID.Hex(), user.Name, user.Email, strconv.Itoa(user.Age), user.CreatedAt.Format(time.RFC3339), }
if err := csvWriter.Write(record); err != nil { return err } }
return nil}实时数据处理
Section titled “实时数据处理”func processRealTimeData() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop()
for { select { case <-ticker.C: if err := processPendingData(); err != nil { log.Printf("Failed to process pending data: %v", err) } } }}
func processPendingData() error { cursor, err := session. Where("status", "pending"). Where("created_at", pie.Gte("created_at", time.Now().Add(-1*time.Hour))). OrderBy("created_at"). FindCursor(context.Background())
if err != nil { return err } defer cursor.Close(context.Background())
processedCount := 0 for cursor.Next(context.Background()) { var user User if err := cursor.Decode(&user); err != nil { continue }
// 处理用户 if err := processUser(&user); err != nil { log.Printf("Failed to process user %s: %v", user.Name, err) continue }
// 更新状态 if err := session.Where("_id", user.ID).Update(context.Background(), bson.D{{"$set", bson.D{{"status", "processed"}}}}); err != nil { log.Printf("Failed to update user status: %v", err) }
processedCount++ }
if processedCount > 0 { log.Printf("处理了 %d 个待处理用户", processedCount) }
return nil}1. 使用投影减少数据传输
Section titled “1. 使用投影减少数据传输”// 只选择需要的字段cursor, err := session. Select("name", "email", "status"). Where("status", "active"). FindCursor(ctx)2. 合理设置批次大小
Section titled “2. 合理设置批次大小”// 根据内存和性能需求调整批次大小const ( SmallBatchSize = 50 // 内存受限环境 MediumBatchSize = 100 // 一般场景 LargeBatchSize = 500 // 高性能场景)3. 使用索引优化
Section titled “3. 使用索引优化”// 确保排序字段有索引cursor, err := session. Where("status", "active"). OrderBy("created_at"). // created_at 应该有索引 FindCursor(ctx)4. 避免在游标中执行复杂操作
Section titled “4. 避免在游标中执行复杂操作”// 好的做法:先获取数据,再处理var users []Usererr := cursor.All(ctx, &users)if err != nil { return err}
// 在内存中处理数据for _, user := range users { processUser(&user)}
// 避免的做法:在游标循环中执行复杂操作for cursor.Next(ctx) { var user User cursor.Decode(&user)
// 避免在游标中执行复杂查询 relatedData, err := getRelatedData(user.ID) // 这会导致N+1查询问题}游标错误处理
Section titled “游标错误处理”func processUsersSafely() error { cursor, err := session. Where("status", "active"). FindCursor(context.Background())
if err != nil { return fmt.Errorf("failed to create cursor: %w", err) } defer func() { if closeErr := cursor.Close(context.Background()); closeErr != nil { log.Printf("Failed to close cursor: %v", closeErr) } }()
for cursor.Next(context.Background()) { var user User if err := cursor.Decode(&user); err != nil { log.Printf("Failed to decode user: %v", err) continue }
if err := processUser(&user); err != nil { log.Printf("Failed to process user %s: %v", user.Name, err) // 根据业务需求决定是否继续 continue } }
// 检查游标错误 if err := cursor.Err(); err != nil { return fmt.Errorf("cursor error: %w", err) }
return nil}1. 总是关闭游标
Section titled “1. 总是关闭游标”cursor, err := session.FindCursor(ctx)if err != nil { return err}defer cursor.Close(ctx) // 确保游标被关闭2. 处理游标错误
Section titled “2. 处理游标错误”for cursor.Next(ctx) { // 处理数据}
// 检查游标是否有错误if err := cursor.Err(); err != nil { log.Printf("Cursor error: %v", err)}3. 合理使用内存
Section titled “3. 合理使用内存”// 对于大数据集,使用流式处理而不是一次性加载for cursor.Next(ctx) { var user User cursor.Decode(&user) processUser(&user) // 处理完立即释放内存}
// 而不是users, err := cursor.All(ctx, &users) // 可能消耗大量内存