Skip to content

Change Streams

Pie provides change stream functionality for real-time data monitoring.

// Create change stream watcher
watcher := pie.NewWatcher[User](engine)
// Watch collection changes
err := watcher.
WatchCollection().
Start(ctx, func(change *pie.ChangeEvent[User]) error {
log.Printf("Change type: %s", change.OperationType)
log.Printf("Document ID: %s", change.DocumentKey)
switch change.OperationType {
case "insert":
log.Printf("New user inserted: %s", change.FullDocument.Name)
case "update":
log.Printf("User updated: %s", change.FullDocument.Name)
case "delete":
log.Printf("User deleted: %s", change.DocumentKey)
}
return nil
})