|
4 | 4 | import com.github.cloudyrock.mongock.ChangeSet; |
5 | 5 | import com.github.cloudyrock.mongock.driver.mongodb.springdata.v4.decorator.impl.MongockTemplate; |
6 | 6 | import com.github.f4b6a3.uuid.UuidCreator; |
| 7 | +import com.mongodb.client.MongoCollection; |
| 8 | +import com.mongodb.client.MongoCursor; |
| 9 | +import com.mongodb.client.result.DeleteResult; |
7 | 10 | import lombok.extern.slf4j.Slf4j; |
8 | 11 | import org.bson.Document; |
9 | 12 | import org.lowcoder.domain.application.model.Application; |
|
44 | 47 |
|
45 | 48 | import java.time.Instant; |
46 | 49 | import java.time.temporal.ChronoUnit; |
| 50 | +import java.util.Arrays; |
47 | 51 | import java.util.List; |
48 | 52 | import java.util.Set; |
49 | 53 |
|
@@ -313,41 +317,86 @@ private int getMongoDBVersion(MongockTemplate mongoTemplate) { |
313 | 317 | @ChangeSet(order = "026", id = "add-time-series-snapshot-history", author = "") |
314 | 318 | public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonConfig commonConfig) { |
315 | 319 | int mongoVersion = getMongoDBVersion(mongoTemplate); |
316 | | - if (mongoVersion < 5) { |
317 | | - log.warn("MongoDB version is below 5. Time-series collections are not supported. Upgrade the MongoDB version."); |
318 | | - } |
319 | 320 |
|
320 | | - // Create the time-series collection if it doesn't exist |
321 | | - if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
322 | | - if(mongoVersion < 5) { |
323 | | - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); |
324 | | - } else { |
325 | | - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, CollectionOptions.empty().timeSeries("createdAt")); |
| 321 | + Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); |
| 322 | + |
| 323 | + if (mongoVersion >= 5) { |
| 324 | + // MongoDB version >= 5: Use manual insert query |
| 325 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 326 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, |
| 327 | + CollectionOptions.empty().timeSeries("createdAt")); |
326 | 328 | } |
| 329 | + |
| 330 | + // Aggregation pipeline to fetch the records |
| 331 | + List<Document> aggregationPipeline = Arrays.asList( |
| 332 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 333 | + new Document("$project", new Document() |
| 334 | + .append("applicationId", 1) |
| 335 | + .append("dsl", 1) |
| 336 | + .append("context", 1) |
| 337 | + .append("createdAt", 1) |
| 338 | + .append("createdBy", 1) |
| 339 | + .append("modifiedBy", 1) |
| 340 | + .append("updatedAt", 1) |
| 341 | + .append("id", "$_id")) // Map `_id` to `id` if needed |
| 342 | + ); |
| 343 | + |
| 344 | + MongoCollection<Document> sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshot"); |
| 345 | + MongoCollection<Document> targetCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS"); |
| 346 | + |
| 347 | + // Fetch results and insert them into the time-series collection |
| 348 | + try (MongoCursor<Document> cursor = sourceCollection.aggregate(aggregationPipeline).iterator()) { |
| 349 | + while (cursor.hasNext()) { |
| 350 | + Document document = cursor.next(); |
| 351 | + targetCollection.insertOne(document); // Insert into the time-series collection |
| 352 | + } |
| 353 | + } |
| 354 | + |
| 355 | + // Delete the migrated records |
| 356 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 357 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 358 | + |
| 359 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
| 360 | + } else { |
| 361 | + // MongoDB version < 5: Use aggregation with $out |
| 362 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 363 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); // Create a regular collection |
| 364 | + } |
| 365 | + |
| 366 | + // Aggregation pipeline with $out |
| 367 | + List<Document> aggregationPipeline = Arrays.asList( |
| 368 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 369 | + new Document("$project", new Document() |
| 370 | + .append("applicationId", 1) |
| 371 | + .append("dsl", 1) |
| 372 | + .append("context", 1) |
| 373 | + .append("createdAt", 1) |
| 374 | + .append("createdBy", 1) |
| 375 | + .append("modifiedBy", 1) |
| 376 | + .append("updatedAt", 1) |
| 377 | + .append("id", "$_id")), // Map `_id` to `id` if needed |
| 378 | + new Document("$out", "applicationHistorySnapshotTS") // Write directly to the target collection |
| 379 | + ); |
| 380 | + |
| 381 | + mongoTemplate.getDb() |
| 382 | + .getCollection("applicationHistorySnapshot") |
| 383 | + .aggregate(aggregationPipeline) |
| 384 | + .toCollection(); |
| 385 | + |
| 386 | + // Delete the migrated records |
| 387 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 388 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 389 | + |
| 390 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
327 | 391 | } |
328 | | - Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); |
329 | | - List<ApplicationHistorySnapshot> snapshots = mongoTemplate.find(new Query().addCriteria(Criteria.where("createdAt").gte(thresholdDate)), ApplicationHistorySnapshot.class); |
330 | | - snapshots.forEach(snapshot -> { |
331 | | - ApplicationHistorySnapshotTS applicationHistorySnapshotTS = new ApplicationHistorySnapshotTS(); |
332 | | - applicationHistorySnapshotTS.setApplicationId(snapshot.getApplicationId()); |
333 | | - applicationHistorySnapshotTS.setDsl(snapshot.getDsl()); |
334 | | - applicationHistorySnapshotTS.setContext(snapshot.getContext()); |
335 | | - applicationHistorySnapshotTS.setCreatedAt(snapshot.getCreatedAt()); |
336 | | - applicationHistorySnapshotTS.setCreatedBy(snapshot.getCreatedBy()); |
337 | | - applicationHistorySnapshotTS.setModifiedBy(snapshot.getModifiedBy()); |
338 | | - applicationHistorySnapshotTS.setUpdatedAt(snapshot.getUpdatedAt()); |
339 | | - applicationHistorySnapshotTS.setId(snapshot.getId()); |
340 | | - mongoTemplate.insert(applicationHistorySnapshotTS); |
341 | | - mongoTemplate.remove(snapshot); |
342 | | - }); |
343 | 392 |
|
344 | | - // Ensure indexes if needed |
| 393 | + // Ensure indexes on the new collection |
345 | 394 | ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class, |
346 | 395 | makeIndex("applicationId"), |
347 | | - makeIndex("createdAt") |
348 | | - ); |
| 396 | + makeIndex("createdAt")); |
349 | 397 | } |
350 | 398 |
|
| 399 | + |
351 | 400 | private void addGidField(MongockTemplate mongoTemplate, String collectionName) { |
352 | 401 | // Create a query to match all documents |
353 | 402 | Query query = new Query(); |
|
0 commit comments