77 "path/filepath"
88 "sort"
99 "sync"
10- "time"
1110
1211 "github.com/go-kit/log"
1312 "github.com/go-kit/log/level"
@@ -22,7 +21,6 @@ import (
2221 "github.com/prometheus/prometheus/model/labels"
2322 prom_storage "github.com/prometheus/prometheus/storage"
2423 "github.com/prometheus/prometheus/tsdb/chunkenc"
25- tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
2624 "github.com/thanos-io/objstore"
2725 storecache "github.com/thanos-io/thanos/pkg/store/cache"
2826 "github.com/thanos-io/thanos/pkg/store/storepb"
@@ -36,7 +34,6 @@ import (
3634 "github.com/cortexproject/cortex/pkg/storage/tsdb"
3735 "github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3836 cortex_util "github.com/cortexproject/cortex/pkg/util"
39- "github.com/cortexproject/cortex/pkg/util/backoff"
4037 cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
4138 "github.com/cortexproject/cortex/pkg/util/spanlogger"
4239 "github.com/cortexproject/cortex/pkg/util/validation"
@@ -64,14 +61,13 @@ type ParquetBucketStores struct {
6461
6562 cortexBucketStoreMetrics * CortexBucketStoreMetrics
6663 userScanner users.Scanner
67- shardingStrategy ShardingStrategy
6864
6965 userTokenBucketsMu sync.RWMutex
7066 userTokenBuckets map [string ]* cortex_util.TokenBucket
7167}
7268
7369// newParquetBucketStores creates a new multi-tenant parquet bucket stores
74- func newParquetBucketStores (cfg tsdb.BlocksStorageConfig , shardingStrategy ShardingStrategy , bucketClient objstore.InstrumentedBucket , limits * validation.Overrides , logger log.Logger , reg prometheus.Registerer ) (* ParquetBucketStores , error ) {
70+ func newParquetBucketStores (cfg tsdb.BlocksStorageConfig , bucketClient objstore.InstrumentedBucket , limits * validation.Overrides , logger log.Logger , reg prometheus.Registerer ) (* ParquetBucketStores , error ) {
7571 // Create caching bucket client for parquet bucket stores
7672 cachingBucket , err := createCachingBucketClientForParquet (cfg , bucketClient , "parquet-storegateway" , logger , reg )
7773 if err != nil {
@@ -88,7 +84,6 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shard
8884 chunksDecoder : schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ()),
8985 inflightRequests : cortex_util .NewInflightRequestTracker (),
9086 cortexBucketStoreMetrics : NewCortexBucketStoreMetrics (reg ),
91- shardingStrategy : shardingStrategy ,
9287 userTokenBuckets : make (map [string ]* cortex_util.TokenBucket ),
9388 }
9489 u .userScanner , err = users .NewScanner (cfg .UsersScanner , bucketClient , logger , reg )
@@ -209,147 +204,14 @@ func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.Labe
209204
210205// SyncBlocks implements BucketStores
211206func (u * ParquetBucketStores ) SyncBlocks (ctx context.Context ) error {
212- return u .syncUsersBlocksWithRetries (ctx , func (ctx context.Context , p * parquetBucketStore ) error {
213- return p .SyncBlocks (ctx )
214- })
207+ return nil
215208}
216209
217210// InitialSync implements BucketStores
218211func (u * ParquetBucketStores ) InitialSync (ctx context.Context ) error {
219- level .Info (u .logger ).Log ("msg" , "synchronizing Parquet blocks for all users" )
220-
221- if err := u .syncUsersBlocksWithRetries (ctx , func (ctx context.Context , p * parquetBucketStore ) error {
222- return p .InitialSync (ctx )
223- }); err != nil {
224- level .Warn (u .logger ).Log ("msg" , "failed to synchronize Parquet blocks" , "err" , err )
225- return err
226- }
227-
228- level .Info (u .logger ).Log ("msg" , "successfully synchronized Parquet blocks for all users" )
229212 return nil
230213}
231214
232- func (u * ParquetBucketStores ) syncUsersBlocksWithRetries (ctx context.Context , f func (context.Context , * parquetBucketStore ) error ) error {
233- retries := backoff .New (ctx , backoff.Config {
234- MinBackoff : 1 * time .Second ,
235- MaxBackoff : 10 * time .Second ,
236- MaxRetries : 3 ,
237- })
238-
239- var lastErr error
240- for retries .Ongoing () {
241- lastErr = u .syncUsersBlocks (ctx , f )
242- if lastErr == nil {
243- return nil
244- }
245-
246- retries .Wait ()
247- }
248-
249- if lastErr == nil {
250- return retries .Err ()
251- }
252-
253- return lastErr
254- }
255-
256- func (u * ParquetBucketStores ) syncUsersBlocks (ctx context.Context , f func (context.Context , * parquetBucketStore ) error ) (returnErr error ) {
257- defer func (start time.Time ) {
258- u .cortexBucketStoreMetrics .syncTimes .Observe (time .Since (start ).Seconds ())
259- if returnErr == nil {
260- u .cortexBucketStoreMetrics .syncLastSuccess .SetToCurrentTime ()
261- }
262- }(time .Now ())
263-
264- type job struct {
265- userID string
266- store * parquetBucketStore
267- }
268-
269- wg := & sync.WaitGroup {}
270- jobs := make (chan job )
271- errs := tsdb_errors .NewMulti ()
272- errsMx := sync.Mutex {}
273-
274- // Scan users in the bucket.
275- userIDs , err := u .scanUsers (ctx )
276- if err != nil {
277- return err
278- }
279-
280- includeUserIDs := make (map [string ]struct {})
281- for _ , userID := range u .shardingStrategy .FilterUsers (ctx , userIDs ) {
282- includeUserIDs [userID ] = struct {}{}
283- }
284-
285- u .cortexBucketStoreMetrics .tenantsDiscovered .Set (float64 (len (userIDs )))
286- u .cortexBucketStoreMetrics .tenantsSynced .Set (float64 (len (includeUserIDs )))
287-
288- // Create a pool of workers which will synchronize blocks. The pool size
289- // is limited in order to avoid to concurrently sync a lot of tenants in
290- // a large cluster.
291- for i := 0 ; i < u .cfg .BucketStore .TenantSyncConcurrency ; i ++ {
292- wg .Add (1 )
293- go func () {
294- defer wg .Done ()
295-
296- for job := range jobs {
297- if err := f (ctx , job .store ); err != nil {
298- if errors .Is (err , bucket .ErrCustomerManagedKeyAccessDenied ) {
299- u .storesErrorsMu .Lock ()
300- u .storesErrors [job .userID ] = httpgrpc .Errorf (int (codes .PermissionDenied ), "store error: %s" , err )
301- u .storesErrorsMu .Unlock ()
302- } else {
303- errsMx .Lock ()
304- errs .Add (errors .Wrapf (err , "failed to synchronize Parquet blocks for user %s" , job .userID ))
305- errsMx .Unlock ()
306- }
307- } else {
308- u .storesErrorsMu .Lock ()
309- delete (u .storesErrors , job .userID )
310- u .storesErrorsMu .Unlock ()
311- }
312- }
313- }()
314- }
315-
316- // Lazily create a bucket store for each new user found
317- // and submit a sync job for each user.
318- for _ , userID := range userIDs {
319- // If we don't have a store for the tenant yet, then we should skip it if it's not
320- // included in the store-gateway shard. If we already have it, we need to sync it
321- // anyway to make sure all its blocks are unloaded and metrics updated correctly
322- // (but bucket API calls are skipped thanks to the objstore client adapter).
323- if _ , included := includeUserIDs [userID ]; ! included && u .getStore (userID ) == nil {
324- continue
325- }
326-
327- bs , err := u .getOrCreateStore (userID )
328- if err != nil {
329- errsMx .Lock ()
330- errs .Add (err )
331- errsMx .Unlock ()
332-
333- continue
334- }
335-
336- select {
337- case jobs <- job {userID : userID , store : bs }:
338- // Nothing to do. Will loop to push more jobs.
339- case <- ctx .Done ():
340- return ctx .Err ()
341- }
342- }
343-
344- // Wait until all workers completed.
345- close (jobs )
346- wg .Wait ()
347-
348- u .deleteLocalFilesForExcludedTenants (includeUserIDs )
349-
350- return errs .Err ()
351- }
352-
353215// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current
354216// shard.
355217func (u * ParquetBucketStores ) deleteLocalFilesForExcludedTenants (includeUserIDs map [string ]struct {}) {
0 commit comments