diff --git a/Cargo.lock b/Cargo.lock index b5cb69aaf..a60491673 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1940,7 +1940,7 @@ dependencies = [ [[package]] name = "mongocrypt" version = "0.3.1" -source = "git+https://github.com/mongodb/libmongocrypt-rust.git?branch=main#0f34015fcde37d805c0e7ead397965ade63bdb07" +source = "git+https://github.com/isabelatkinson/libmongocrypt-rust.git?branch=lookup#047f4d05f15b92de6eea9682f8295e47912fc4ba" dependencies = [ "bson 2.15.0", "bson 3.0.0", @@ -1952,7 +1952,7 @@ dependencies = [ [[package]] name = "mongocrypt-sys" version = "0.1.4+1.12.0" -source = "git+https://github.com/mongodb/libmongocrypt-rust.git?branch=main#0f34015fcde37d805c0e7ead397965ade63bdb07" +source = "git+https://github.com/isabelatkinson/libmongocrypt-rust.git?branch=lookup#047f4d05f15b92de6eea9682f8295e47912fc4ba" [[package]] name = "mongodb" diff --git a/Cargo.toml b/Cargo.toml index 66728f766..692f7df62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -171,8 +171,8 @@ features = ["serde"] [dependencies.mongocrypt] version = "0.3.1" -git = "https://github.com/mongodb/libmongocrypt-rust.git" -branch = "main" +git = "https://github.com/isabelatkinson/libmongocrypt-rust.git" +branch = "lookup" default-features = false optional = true diff --git a/src/client/csfle.rs b/src/client/csfle.rs index 79736c33a..7a93631f1 100644 --- a/src/client/csfle.rs +++ b/src/client/csfle.rs @@ -101,7 +101,8 @@ impl ClientState { .use_need_kms_credentials_state() .retry_kms(true)? .use_range_v2()? - .use_need_mongo_collinfo_with_db_state(); + .use_need_mongo_collinfo_with_db_state() + .enable_multiple_collinfo()?; if let Some(m) = &opts.schema_map { builder = builder.schema_map(&crate::bson_compat::serialize_to_document(m)?)?; } diff --git a/src/client/csfle/state_machine.rs b/src/client/csfle/state_machine.rs index f2644b97f..0cd12ca26 100644 --- a/src/client/csfle/state_machine.rs +++ b/src/client/csfle/state_machine.rs @@ -116,7 +116,7 @@ impl CryptExecutor { Error::internal("db required for NeedMongoCollinfo state") })?); let mut cursor = db.list_collections().filter(filter).await?; - if cursor.advance().await? { + while cursor.advance().await? { ctx.mongo_feed(cursor.current())?; } ctx.mongo_done()?; diff --git a/src/test/csfle/prose.rs b/src/test/csfle/prose.rs index 60b9b4bce..47a98a281 100644 --- a/src/test/csfle/prose.rs +++ b/src/test/csfle/prose.rs @@ -8,7 +8,7 @@ use std::{ time::Duration, }; -use futures_util::TryStreamExt; +use futures_util::{StreamExt, TryStreamExt}; use mongocrypt::ctx::Algorithm; use tokio::net::TcpListener; @@ -40,11 +40,13 @@ use crate::{ command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, sdam::SdamEvent, }, - options::{EncryptOptions, FindOptions, IndexOptions, WriteConcern}, + options::{CreateCollectionOptions, EncryptOptions, FindOptions, IndexOptions, WriteConcern}, runtime, test::{ get_client_options, log_uncaptured, + mongocrypt_version_lt, + server_version_gte, server_version_lt, topology_is_standalone, util::{ @@ -57,6 +59,7 @@ use crate::{ Client, Collection, IndexModel, + Namespace, }; use super::{ @@ -2116,6 +2119,413 @@ async fn range_explicit_encryption_defaults() -> Result<()> { Ok(()) } +// Prose test 25. $lookup +mod lookup { + use super::*; + + async fn set_up() -> Option { + if mongocrypt_version_lt("1.13.0") + || server_version_lt(7, 0).await + || topology_is_standalone().await + { + log_uncaptured( + "skipping $lookup test: requires mongocrypt 1.13+, server 7.0+, non-standalone \ + topology", + ); + return None; + } + + let encrypted_client = Client::encrypted_builder( + get_client_options().await.clone(), + Namespace::new("db", "keyvault"), + [LOCAL_KMS.clone()], + ) + .unwrap() + .build() + .await + .unwrap(); + + let keyvault = encrypted_client.database("db").collection("keyvault"); + keyvault.drop().await.unwrap(); + let key_doc: Document = load_testdata("data/lookup/key-doc.json").unwrap(); + keyvault + .insert_one(key_doc) + .write_concern(WriteConcern::majority()) + .await + .unwrap(); + + let drop_and_create = async |coll: &str, options: Option| { + let db = encrypted_client.database("db"); + db.collection::(coll).drop().await.unwrap(); + db.create_collection(coll) + .with_options(options) + .await + .unwrap(); + }; + + let schema_csfle: Document = load_testdata("data/lookup/schema-csfle.json").unwrap(); + drop_and_create( + "csfle", + Some( + CreateCollectionOptions::builder() + .validator(doc! { "$jsonSchema": schema_csfle }) + .build(), + ), + ) + .await; + + let schema_csfle2: Document = load_testdata("data/lookup/schema-csfle2.json").unwrap(); + drop_and_create( + "csfle2", + Some( + CreateCollectionOptions::builder() + .validator(doc! { "$jsonSchema": schema_csfle2 }) + .build(), + ), + ) + .await; + + let schema_qe: Document = load_testdata("data/lookup/schema-qe.json").unwrap(); + drop_and_create( + "qe", + Some( + CreateCollectionOptions::builder() + .encrypted_fields(schema_qe) + .build(), + ), + ) + .await; + + let schema_qe2: Document = load_testdata("data/lookup/schema-qe2.json").unwrap(); + drop_and_create( + "qe2", + Some( + CreateCollectionOptions::builder() + .encrypted_fields(schema_qe2) + .build(), + ), + ) + .await; + + drop_and_create("no_schema", None).await; + drop_and_create("no_schema2", None).await; + + let schema_non_csfle: Document = + load_testdata("data/lookup/schema-non-csfle.json").unwrap(); + drop_and_create( + "non_csfle_schema", + Some( + CreateCollectionOptions::builder() + .validator(doc! { "$jsonSchema": schema_non_csfle }) + .build(), + ), + ) + .await; + + let unencrypted_client = Client::for_test().await; + let insert = async |name: &str| -> Bson { + let coll = encrypted_client.database("db").collection(name); + let doc = doc! { name: name }; + coll.insert_one(doc).await.unwrap().inserted_id + }; + let insert_and_find = async |name: &str| { + let inserted_id = insert(name).await; + let coll = unencrypted_client + .database("db") + .collection::(name); + let inserted = coll + .find_one(doc! { "_id": inserted_id }) + .await + .unwrap() + .unwrap(); + let value = inserted.get(name).unwrap(); + assert!(matches!(value, Bson::Binary(_))); + }; + + insert_and_find("csfle").await; + insert_and_find("csfle2").await; + insert_and_find("qe").await; + insert_and_find("qe2").await; + insert("no_schema").await; + insert("no_schema2").await; + insert("non_csfle_schema").await; + + Some( + Client::encrypted_builder( + get_client_options().await.clone(), + Namespace::new("db", "keyvault"), + [LOCAL_KMS.clone()], + ) + .unwrap() + .build() + .await + .unwrap(), + ) + } + + async fn test_successful_lookup(coll: &str, pipeline: Vec, expected: Document) { + if server_version_lt(8, 1).await { + log_uncaptured("skipping $lookup test: requires server version 8.1+"); + return; + } + let Some(client) = set_up().await else { + return; + }; + + let coll = client.database("db").collection::(coll); + let mut cursor = coll.aggregate(pipeline).await.unwrap(); + + let doc = cursor.next().await.unwrap().unwrap(); + assert_eq!(doc, expected); + assert!(cursor.next().await.is_none()); + } + + #[tokio::test] + async fn case_1() { + let coll = "csfle"; + let pipeline = vec![ + doc! { "$match": { "csfle": "csfle" } }, + doc! { + "$lookup": { + "from": "no_schema", + "as": "matched", + "pipeline": [ + { "$match": { "no_schema": "no_schema" } }, + { "$project": { "_id": 0 } } + ], + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let expected = doc! { "csfle": "csfle", "matched": [ { "no_schema": "no_schema" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_2() { + let coll = "qe"; + let pipeline = vec![ + doc! { "$match": { "qe": "qe" } }, + doc! { + "$lookup": { + "from": "no_schema", + "as": "matched", + "pipeline": [ + { "$match": { "no_schema": "no_schema" } }, + { "$project": { "_id": 0, "__safeContent__": 0 } }, + ], + } + }, + doc! { "$project": { "_id": 0, "__safeContent__": 0 } }, + ]; + let expected = doc! { "qe": "qe", "matched": [ { "no_schema": "no_schema" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_3() { + let coll = "no_schema"; + let pipeline = vec![ + doc! { "$match": { "no_schema": "no_schema" } }, + doc! { + "$lookup": { + "from": "csfle", + "as": "matched", + "pipeline": [ + { "$match": { "csfle": "csfle" } }, + { "$project": { "_id": 0 } }, + ], + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let expected = doc! { "no_schema": "no_schema", "matched": [ { "csfle": "csfle" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_4() { + let coll = "no_schema"; + let pipeline = vec![ + doc! { "$match": { "no_schema": "no_schema" } }, + doc! { + "$lookup": { + "from": "qe", + "as": "matched", + "pipeline": [ + { "$match": { "qe": "qe" } }, + { "$project": { "_id": 0, "__safeContent__": 0 } }, + ] + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let expected = doc! { "no_schema": "no_schema", "matched": [ { "qe": "qe" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_5() { + let coll = "csfle"; + let pipeline = vec![ + doc! { "$match": { "csfle": "csfle" } }, + doc! { + "$lookup": { + "from": "csfle2", + "as": "matched", + "pipeline": [ + { "$match": { "csfle2": "csfle2" } }, + { "$project": { "_id": 0 } } + ] + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let expected = doc! { "csfle": "csfle", "matched": [{ "csfle2": "csfle2" }] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_6() { + let coll = "qe"; + let pipeline = vec![ + doc! { "$match": { "qe": "qe" } }, + doc! { + "$lookup": { + "from": "qe2", + "as": "matched", + "pipeline": [ + { "$match": { "qe2": "qe2" } }, + { "$project": { "_id": 0, "__safeContent__": 0 } }, + ], + } + }, + doc! { "$project": { "_id": 0, "__safeContent__": 0 } }, + ]; + let expected = doc! { "qe": "qe", "matched": [ { "qe2": "qe2" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_7() { + let coll = "no_schema"; + let pipeline = vec![ + doc! { "$match": { "no_schema": "no_schema" } }, + doc! { + "$lookup": { + "from": "no_schema2", + "as": "matched", + "pipeline": [ + { "$match": { "no_schema2": "no_schema2" } }, + { "$project": { "_id": 0 } }, + ] + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let expected = + doc! { "no_schema": "no_schema", "matched": [ { "no_schema2": "no_schema2" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } + + #[tokio::test] + async fn case_8() { + if server_version_lt(8, 1).await { + log_uncaptured("skipping $lookup test: requires server version 8.1+"); + return; + } + let Some(client) = set_up().await else { + return; + }; + + let coll = client.database("db").collection::("csfle"); + let pipeline = vec![ + doc! { "$match": { "csfle": "qe" } }, + doc! { + "$lookup": { + "from": "qe", + "as": "matched", + "$pipeline": [ + { "$match": { "qe": "qe" } }, + { "$project": { "_id": 0 } }, + + ] + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let error = coll.aggregate(pipeline).await.unwrap_err(); + + let message = error.message().unwrap(); + if server_version_lt(8, 2).await || mongocrypt_version_lt("1.17.0") { + assert!(message.contains("not supported")); + } else { + assert!(message.contains( + "Cannot specify both encryptionInformation and csfleEncryptionSchemas unless \ + csfleEncryptionSchemas only contains non-encryption JSON schema validators" + )); + } + } + + #[tokio::test] + async fn case_9() { + if server_version_gte(8, 1).await { + log_uncaptured("skipping $lookup test: requires server version <8.1"); + return; + } + let Some(client) = set_up().await else { + return; + }; + + let coll = client.database("db").collection::("csfle"); + let pipeline = vec![ + doc! { "$match": { "csfle": "csfle" } }, + doc! { + "$lookup": { + "from": "no_schema", + "as": "matched", + "$pipeline": [ + { "$match": { "no_schema": "no_schema" } }, + { "$project": { "_id": 0 } }, + ] + } + }, + doc! { "$project": { "_id": 0 } }, + ]; + let error = coll.aggregate(pipeline).await.unwrap_err(); + assert!(error.message().unwrap().contains("Upgrade")); + } + + #[tokio::test] + async fn case_10() { + if server_version_lt(8, 2).await || mongocrypt_version_lt("1.17.0") { + log_uncaptured( + "skipping $lookup test: requires server version 8.2+, mongocrypt 1.17.0+", + ); + return; + } + + let coll = "qe"; + let pipeline = vec![ + doc! { "$match": { "qe": "qe" } }, + doc! { + "$lookup": { + "from": "non_csfle_schema", + "as": "matched", + "pipeline": [ + { "$match": { "non_csfle_schema": "non_csfle_schema" } }, + { "$project": { "_id": 0, "__safeContent__": 0 } }, + ] + } + }, + doc! { "$project": { "_id": 0, "__safeContent__": 0 } }, + ]; + let expected = + doc! { "qe": "qe", "matched": [ { "non_csfle_schema": "non_csfle_schema" } ] }; + test_successful_lookup(coll, pipeline, expected).await; + } +} + // FLE 2.0 Documentation Example #[tokio::test] async fn fle2_example() -> Result<()> { @@ -2270,10 +2680,7 @@ async fn encrypt_expression_with_options() { #[tokio::test] #[cfg(feature = "text-indexes-unstable")] async fn text_indexes_explicit_encryption() { - use crate::{ - client_encryption::{PrefixOptions, SubstringOptions, SuffixOptions, TextOptions}, - test::mongocrypt_version_lt, - }; + use crate::client_encryption::{PrefixOptions, SubstringOptions, SuffixOptions, TextOptions}; if server_version_lt(8, 2).await || topology_is_standalone().await diff --git a/src/test/spec/json/testdata/client-side-encryption/data/lookup/schema-non-csfle.json b/src/test/spec/json/testdata/client-side-encryption/data/lookup/schema-non-csfle.json new file mode 100644 index 000000000..3edd12c8f --- /dev/null +++ b/src/test/spec/json/testdata/client-side-encryption/data/lookup/schema-non-csfle.json @@ -0,0 +1,3 @@ +{ + "bsonType": "object" +}