From faed5bb6ab25393b815e5013d63ec14123f6c1eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E5=B9=BF?= Date: Fri, 20 Mar 2026 17:10:24 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E5=92=8C=20HTTP=20upsert=20=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - examples/stream_aggregate_example.go: 使用循环调用 Insert 替代不存在的 InsertMany - internal/engine/crud_handler.go: Update 方法添加 upsert 参数 - internal/protocol/http/server.go: 传递 op.Upsert 到 CRUDHandler - internal/engine/memory_store.go: upsert 时优先使用 filter 中的 _id Co-Authored-By: Claude Opus 4.6 --- examples/stream_aggregate_example.go | 8 +++++--- internal/engine/crud_handler.go | 4 ++-- internal/engine/memory_store.go | 6 ++++++ internal/protocol/http/server.go | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/examples/stream_aggregate_example.go b/examples/stream_aggregate_example.go index fd5894d..f91d90f 100644 --- a/examples/stream_aggregate_example.go +++ b/examples/stream_aggregate_example.go @@ -36,9 +36,11 @@ func main() { docs = append(docs, doc) } - if err := store.InsertMany(collection, docs); err != nil { - log.Printf("Error inserting documents: %v", err) - return + for _, doc := range docs { + if err := store.Insert(collection, doc); err != nil { + log.Printf("Error inserting document: %v", err) + return + } } // 定义聚合管道 diff --git a/internal/engine/crud_handler.go b/internal/engine/crud_handler.go index 6141c1a..24b16e6 100644 --- a/internal/engine/crud_handler.go +++ b/internal/engine/crud_handler.go @@ -54,8 +54,8 @@ func (h *CRUDHandler) Insert(ctx context.Context, collection string, docs []map[ } // Update 更新文档 -func (h *CRUDHandler) Update(ctx context.Context, collection string, filter types.Filter, update types.Update) (*types.UpdateResult, error) { - matched, modified, _, err := h.store.Update(collection, filter, update, false, nil) +func (h *CRUDHandler) Update(ctx context.Context, collection string, filter types.Filter, update types.Update, upsert bool) (*types.UpdateResult, error) { + matched, modified, _, err := h.store.Update(collection, filter, update, upsert, nil) if err != nil { return nil, err } diff --git a/internal/engine/memory_store.go b/internal/engine/memory_store.go index 847d3db..7907dca 100644 --- a/internal/engine/memory_store.go +++ b/internal/engine/memory_store.go @@ -388,6 +388,12 @@ func (ms *MemoryStore) Update(collection string, filter types.Filter, update typ if matched == 0 && upsert { // 创建新文档 newID := generateID() + // 优先使用 filter 中的 _id + if idVal, ok := filter["_id"]; ok { + if idStr, ok := idVal.(string); ok && idStr != "" { + newID = idStr + } + } newDoc := make(map[string]interface{}) // 应用更新($setOnInsert 会生效) diff --git a/internal/protocol/http/server.go b/internal/protocol/http/server.go index 2447d63..fa7f4e8 100644 --- a/internal/protocol/http/server.go +++ b/internal/protocol/http/server.go @@ -271,7 +271,7 @@ func (h *RequestHandler) HandleUpdate(w http.ResponseWriter, r *http.Request, db upserted := make([]types.UpsertID, 0) for _, op := range req.Updates { - result, err := h.crud.Update(context.Background(), fullCollection, op.Q, op.U) + result, err := h.crud.Update(context.Background(), fullCollection, op.Q, op.U, op.Upsert) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return