@@ -2,10 +2,12 @@ package parquetconverter
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "io"
78 "math/rand"
89 "path"
10+ "strings"
911 "testing"
1012 "time"
1113
@@ -21,6 +23,8 @@ import (
2123 "github.com/thanos-io/objstore/providers/filesystem"
2224 "github.com/thanos-io/thanos/pkg/block"
2325 "github.com/thanos-io/thanos/pkg/block/metadata"
26+ "google.golang.org/grpc/codes"
27+ "google.golang.org/grpc/status"
2428
2529 "github.com/cortexproject/cortex/integration/e2e"
2630 "github.com/cortexproject/cortex/pkg/ring"
@@ -31,6 +35,7 @@ import (
3135 "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3236 "github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3337 "github.com/cortexproject/cortex/pkg/util/concurrency"
38+ cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
3439 "github.com/cortexproject/cortex/pkg/util/flagext"
3540 "github.com/cortexproject/cortex/pkg/util/services"
3641 "github.com/cortexproject/cortex/pkg/util/test"
@@ -269,7 +274,8 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
269274
270275 // Create a mock bucket that wraps the filesystem bucket but fails uploads
271276 mockBucket := & mockBucket {
272- Bucket : fsBucket ,
277+ Bucket : fsBucket ,
278+ uploadFailure : fmt .Errorf ("mock upload failure" ),
273279 }
274280
275281 converter := newConverter (cfg , objstore .WithNoopInstr (mockBucket ), storageCfg , []int64 {3600000 , 7200000 }, nil , reg , overrides , nil )
@@ -284,13 +290,94 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
284290 assert .Equal (t , 1.0 , testutil .ToFloat64 (converter .metrics .convertBlockFailures .WithLabelValues (userID )))
285291}
286292
293+ func TestConverter_ShouldNotFailOnAccessDenyError (t * testing.T ) {
294+ // Create a new registry for testing
295+ reg := prometheus .NewRegistry ()
296+
297+ // Create a new converter with test configuration
298+ cfg := Config {
299+ MetaSyncConcurrency : 1 ,
300+ DataDir : t .TempDir (),
301+ }
302+ logger := log .NewNopLogger ()
303+ storageCfg := cortex_tsdb.BlocksStorageConfig {}
304+ flagext .DefaultValues (& storageCfg )
305+ limits := & validation.Limits {}
306+ flagext .DefaultValues (limits )
307+ overrides := validation .NewOverrides (* limits , nil )
308+ limits .ParquetConverterEnabled = true
309+
310+ // Create a filesystem bucket for initial block upload
311+ fsBucket , err := filesystem .NewBucket (t .TempDir ())
312+ require .NoError (t , err )
313+
314+ // Create test labels
315+ lbls := labels.Labels {labels.Label {
316+ Name : "__name__" ,
317+ Value : "test" ,
318+ }}
319+
320+ // Create a real TSDB block
321+ dir := t .TempDir ()
322+ rnd := rand .New (rand .NewSource (time .Now ().Unix ()))
323+ blockID , err := e2e .CreateBlock (context .Background (), rnd , dir , []labels.Labels {lbls }, 2 , 0 , 2 * time .Hour .Milliseconds (), time .Minute .Milliseconds (), 10 )
324+ require .NoError (t , err )
325+ bdir := path .Join (dir , blockID .String ())
326+
327+ userID := "test-user"
328+
329+ // Upload the block to filesystem bucket
330+ err = block .Upload (context .Background (), logger , bucket .NewPrefixedBucketClient (fsBucket , userID ), bdir , metadata .NoneFunc )
331+ require .NoError (t , err )
332+
333+ var mb * mockBucket
334+ t .Run ("get failure" , func (t * testing.T ) {
335+ // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
336+ mb = & mockBucket {
337+ Bucket : fsBucket ,
338+ getFailure : cortex_errors .WithCause (errors .New ("dummy error" ), status .Error (codes .PermissionDenied , "dummy" )),
339+ }
340+ })
341+
342+ t .Run ("upload failure" , func (t * testing.T ) {
343+ // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
344+ mb = & mockBucket {
345+ Bucket : fsBucket ,
346+ uploadFailure : cortex_errors .WithCause (errors .New ("dummy error" ), status .Error (codes .PermissionDenied , "dummy" )),
347+ }
348+ })
349+
350+ converter := newConverter (cfg , objstore .WithNoopInstr (mb ), storageCfg , []int64 {3600000 , 7200000 }, nil , reg , overrides , nil )
351+ converter .ringLifecycler = & ring.Lifecycler {
352+ Addr : "1.2.3.4" ,
353+ }
354+
355+ err = converter .convertUser (context .Background (), logger , & RingMock {ReadRing : & ring.Ring {}}, userID )
356+ require .Error (t , err )
357+
358+ // Verify the failure metric was not incremented
359+ assert .Equal (t , 0.0 , testutil .ToFloat64 (converter .metrics .convertBlockFailures .WithLabelValues (userID )))
360+ }
361+
287362// mockBucket implements objstore.Bucket for testing
288363type mockBucket struct {
289364 objstore.Bucket
365+ uploadFailure error
366+ getFailure error
290367}
291368
292369func (m * mockBucket ) Upload (ctx context.Context , name string , r io.Reader ) error {
293- return fmt .Errorf ("mock upload failure" )
370+ if m .uploadFailure != nil {
371+ return m .uploadFailure
372+ }
373+ return m .Bucket .Upload (ctx , name , r )
374+ }
375+
376+ func (m * mockBucket ) Get (ctx context.Context , name string ) (io.ReadCloser , error ) {
377+ if m .getFailure != nil && strings .Contains (name , "index" ) {
378+ return nil , m .getFailure
379+ }
380+ return m .Bucket .Get (ctx , name )
294381}
295382
296383type RingMock struct {
0 commit comments