@@ -6,41 +6,123 @@ import (
66
77 "github.com/stretchr/testify/require"
88
9- "github.com/cortexproject/cortex/pkg/util/logical_plan "
9+ "github.com/cortexproject/cortex/pkg/distributed_execution "
1010)
1111
12+ // Tests fragmentation of logical plans, verifying that the fragments contain correct metadata.
13+ // Note: The number of fragments is determined by the distributed optimizer's strategy -
14+ // if the optimizer logic changes, this test will need to be updated accordingly.
1215func TestFragmenter (t * testing.T ) {
1316 type testCase struct {
14- name string
15- query string
16- start time.Time
17- end time.Time
18- expectedFragments int
17+ name string
18+ query string
19+ start time.Time
20+ end time.Time
21+ expectedFragmentsCnt int
1922 }
2023
2124 now := time .Now ()
22-
23- // more tests will be added when distributed optimizer and fragmenter are implemented
2425 tests := []testCase {
2526 {
26- name : "simple logical query plan - no fragmentation" ,
27- query : "up" ,
28- start : now ,
29- end : now ,
30- expectedFragments : 1 ,
27+ name : "simple logical query plan - no fragmentation" ,
28+ query : "up" ,
29+ start : now ,
30+ end : now ,
31+ expectedFragmentsCnt : 1 ,
32+ },
33+ {
34+ name : "binary operation with aggregations" ,
35+ query : "sum(rate(node_cpu_seconds_total{mode!=\" idle\" }[5m])) + sum(rate(node_memory_Active_bytes[5m]))" ,
36+ start : now ,
37+ end : now ,
38+ expectedFragmentsCnt : 3 ,
39+ },
40+ {
41+ name : "multiple binary operation with aggregations" ,
42+ query : "sum(rate(http_requests_total{job=\" api\" }[5m])) + sum(rate(http_requests_total{job=\" web\" }[5m])) + sum(rate(http_requests_total{job=\" cache\" }[5m]))" ,
43+ start : now ,
44+ end : now ,
45+ expectedFragmentsCnt : 5 ,
46+ },
47+ {
48+ name : "multiple binary operation with aggregations" ,
49+ query : "sum(rate(http_requests_total{job=\" api\" }[5m])) + sum(rate(http_requests_total{job=\" web\" }[5m])) + sum(rate(http_requests_total{job=\" cache\" }[5m])) + sum(rate(http_requests_total{job=\" db\" }[5m]))" ,
50+ start : now ,
51+ end : now ,
52+ expectedFragmentsCnt : 7 ,
3153 },
3254 }
3355
3456 for _ , tc := range tests {
3557 t .Run (tc .name , func (t * testing.T ) {
36- lp , err := logical_plan .CreateTestLogicalPlan (tc .query , tc .start , tc .end , 0 )
58+ lp , err := distributed_execution .CreateTestLogicalPlan (tc .query , tc .start , tc .end , 0 )
3759 require .NoError (t , err )
3860
39- fragmenter := NewDummyFragmenter ()
40- res , err := fragmenter .Fragment ((* lp ).Root ())
61+ fragmenter := NewPlanFragmenter ()
62+ res , err := fragmenter .Fragment (uint64 ( 1 ), (* lp ).Root ())
4163
4264 require .NoError (t , err )
43- require .Equal (t , tc .expectedFragments , len (res ))
65+
66+ // first check the number of fragments
67+ require .Equal (t , tc .expectedFragmentsCnt , len (res ))
68+
69+ // check the fragments returned by comparing child IDs, ensuring correct hierarchy
70+ if len (res ) == 3 { // 3 fragment cases
71+ // current binary split:
72+ // (due to the design of the distributed optimizer)
73+ // 2
74+ // / \
75+ // 0 1
76+ require .Empty (t , res [0 ].ChildIDs )
77+ require .Empty (t , res [1 ].ChildIDs )
78+ require .Equal (t , []uint64 {res [0 ].FragmentID , res [1 ].FragmentID }, res [2 ].ChildIDs )
79+
80+ } else if len (res ) == 5 {
81+ // current binary split:
82+ // 4
83+ // / \
84+ // 2 3
85+ // / \
86+ // 0 1
87+ require .Empty (t , res [0 ].ChildIDs )
88+ require .Empty (t , res [1 ].ChildIDs )
89+ require .Empty (t , res [3 ].ChildIDs )
90+
91+ require .Containsf (t , res [2 ].ChildIDs , res [0 ].FragmentID , "child ID of fragment 0 not found in layer 2" )
92+ require .Containsf (t , res [2 ].ChildIDs , res [1 ].FragmentID , "child ID of fragment 1 not found in layer 2" )
93+ require .Equal (t , len (res [2 ].ChildIDs ), 2 ) // binary check
94+
95+ require .Containsf (t , res [4 ].ChildIDs , res [3 ].FragmentID , "child ID of fragment 3 not found in layer 3" )
96+ require .Containsf (t , res [4 ].ChildIDs , res [2 ].FragmentID , "child ID of fragment 4 not found in layer 3" )
97+ require .Equal (t , len (res [4 ].ChildIDs ), 2 ) // binary check
98+
99+ } else if len (res ) == 7 { // 7 fragment cases
100+ // current binary split:
101+ // 6
102+ // / \
103+ // 4 5
104+ // / \
105+ // 2 3
106+ // / \
107+ // 0 1
108+
109+ require .Empty (t , res [0 ].ChildIDs )
110+ require .Empty (t , res [1 ].ChildIDs )
111+ require .Empty (t , res [3 ].ChildIDs )
112+ require .Empty (t , res [5 ].ChildIDs )
113+
114+ require .Containsf (t , res [2 ].ChildIDs , res [0 ].FragmentID , "child ID of fragment 0 not found in layer 2" )
115+ require .Containsf (t , res [2 ].ChildIDs , res [1 ].FragmentID , "child ID of fragment 1 not found in layer 2" )
116+ require .Equal (t , len (res [2 ].ChildIDs ), 2 ) // binary check
117+
118+ require .Containsf (t , res [4 ].ChildIDs , res [3 ].FragmentID , "child ID of fragment 3 not found in layer 3" )
119+ require .Containsf (t , res [4 ].ChildIDs , res [2 ].FragmentID , "child ID of fragment 4 not found in layer 3" )
120+ require .Equal (t , len (res [4 ].ChildIDs ), 2 ) // binary check
121+
122+ require .Containsf (t , res [6 ].ChildIDs , res [4 ].FragmentID , "child ID of fragment 4 not found in layer 4" )
123+ require .Containsf (t , res [6 ].ChildIDs , res [5 ].FragmentID , "child ID of fragment 5 not found in layer 4" )
124+ require .Equal (t , len (res [6 ].ChildIDs ), 2 ) // binary check
125+ }
44126 })
45127 }
46128}
0 commit comments