Skip to content

Commit abcbab4

Browse files
authored
[Bugfix] [Platform] Ensure Inventory picks active leader (#1986)
1 parent b6b5275 commit abcbab4

File tree

5 files changed

+51
-197
lines changed

5 files changed

+51
-197
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- (Feature) (License) Activation API Integration
1313
- (Feature) (Platform) Chart & Service Kubernetes Events
1414
- (Feature) (Platform) Registry Secret
15+
- (Bugfix) (Platform) Ensure Inventory picks active leader
1516

1617
## [1.3.1](https://github.com/arangodb/kube-arangodb/tree/1.3.1) (2025-10-07)
1718
- (Documentation) Add ArangoPlatformStorage Docs & Examples

pkg/deployment/reconcile/plan_builder_license.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package reconcile
2222

2323
import (
2424
"context"
25+
"math/rand"
2526
"time"
2627

2728
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
@@ -117,6 +118,29 @@ func (r *Reconciler) updateClusterLicenseDiscover(spec api.DeploymentSpec, conte
117118
return "", errors.Errorf("Unable to discover License mode")
118119
}
119120

121+
func (r *Reconciler) updateClusterLicenseMember(status api.DeploymentStatus) (api.DeploymentStatusMemberElement, bool) {
122+
members := status.Members.AsListInGroups(arangod.GroupsWithLicenseV2()...).Filter(func(a api.DeploymentStatusMemberElement) bool {
123+
i := a.Member.Image
124+
if i == nil {
125+
return false
126+
}
127+
128+
return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
129+
}).Filter(func(a api.DeploymentStatusMemberElement) bool {
130+
return a.Member.Conditions.IsTrue(api.ConditionTypeReady)
131+
})
132+
133+
if len(members) == 0 {
134+
return api.DeploymentStatusMemberElement{}, false
135+
}
136+
137+
if len(members) == 1 {
138+
return members[0], true
139+
}
140+
141+
return members[rand.Intn(len(members))], true
142+
}
143+
120144
func (r *Reconciler) updateClusterLicenseKey(ctx context.Context, spec api.DeploymentSpec, status api.DeploymentStatus, context PlanBuilderContext) api.Plan {
121145
l, err := k8sutil.GetLicenseFromSecret(context.ACS().CurrentClusterCache(), spec.License.GetSecretName())
122146
if err != nil {
@@ -129,23 +153,14 @@ func (r *Reconciler) updateClusterLicenseKey(ctx context.Context, spec api.Deplo
129153
return nil
130154
}
131155

132-
members := status.Members.AsListInGroups(arangod.GroupsWithLicenseV2()...).Filter(func(a api.DeploymentStatusMemberElement) bool {
133-
i := a.Member.Image
134-
if i == nil {
135-
return false
136-
}
156+
member, ok := r.updateClusterLicenseMember(status)
137157

138-
return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
139-
})
140-
141-
if len(members) == 0 {
158+
if !ok {
142159
// No member found to take this action
143160
r.log.Trace("No enterprise member in version 3.9.0 or above")
144161
return nil
145162
}
146163

147-
member := members[0]
148-
149164
ctxChild, cancel := globals.GetGlobals().Timeouts().ArangoD().WithTimeout(ctx)
150165
defer cancel()
151166

@@ -187,23 +202,14 @@ func (r *Reconciler) updateClusterLicenseAPI(ctx context.Context, spec api.Deplo
187202
return nil
188203
}
189204

190-
members := status.Members.AsListInGroups(api.ServerGroupCoordinators, api.ServerGroupSingle).Filter(func(a api.DeploymentStatusMemberElement) bool {
191-
i := a.Member.Image
192-
if i == nil {
193-
return false
194-
}
195-
196-
return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
197-
})
205+
member, ok := r.updateClusterLicenseMember(status)
198206

199-
if len(members) == 0 {
207+
if !ok {
200208
// No member found to take this action
201209
r.log.Trace("No enterprise member in version 3.9.0 or above")
202210
return nil
203211
}
204212

205-
member := members[0]
206-
207213
ctxChild, cancel := globals.GetGlobals().Timeouts().ArangoD().WithTimeout(ctx)
208214
defer cancel()
209215

pkg/deployment/resources/pod_inspector.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,25 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
253253
}
254254
}
255255

256+
if spec.Mode.Get() == api.DeploymentModeActiveFailover && features.FailoverLeadership().Enabled() && group == api.ServerGroupSingle {
257+
s, ok := r.context.GetMembersState().MemberState(memberStatus.ID)
258+
if ok && s.IsServing() {
259+
if v, ok := pod.Labels[k8sutil.LabelKeyArangoLeader]; !ok || v != k8sutil.LabelValueArangoActive {
260+
pod.Labels[k8sutil.LabelKeyArangoLeader] = k8sutil.LabelValueArangoActive
261+
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), pod.Labels)); err != nil {
262+
log.Str("pod-name", pod.GetName()).Err(err).Error("Unable to update labels")
263+
}
264+
}
265+
} else {
266+
if _, ok := pod.Labels[k8sutil.LabelKeyArangoLeader]; ok {
267+
delete(pod.Labels, k8sutil.LabelKeyArangoLeader)
268+
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), pod.Labels)); err != nil {
269+
log.Str("pod-name", pod.GetName()).Err(err).Error("Unable to update labels")
270+
}
271+
}
272+
}
273+
}
274+
256275
if memberStatus.Conditions.IsTrue(api.ConditionTypeActive) {
257276
if v, ok := pod.Labels[k8sutil.LabelKeyArangoActive]; !ok || v != k8sutil.LabelValueArangoActive {
258277
pod.Labels[k8sutil.LabelKeyArangoActive] = k8sutil.LabelValueArangoActive

pkg/deployment/resources/pod_leader.go

Lines changed: 2 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,12 @@ package resources
2222

2323
import (
2424
"context"
25-
goStrings "strings"
26-
"sync"
2725

2826
core "k8s.io/api/core/v1"
2927
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
30-
"k8s.io/apimachinery/pkg/types"
3128

3229
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
33-
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
3430
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
35-
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
3631
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3732
"github.com/arangodb/kube-arangodb/pkg/util/globals"
3833
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@@ -46,7 +41,7 @@ import (
4641
// consequentially service will not point to any pod.
4742
// It works only in active fail-over mode.
4843
func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
49-
if r.context.GetSpec().GetMode() != api.DeploymentModeActiveFailover {
44+
if !r.context.GetSpec().GetMode().HasAgents() {
5045
return nil
5146
}
5247

@@ -129,7 +124,7 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
129124
return err
130125
} else {
131126
if !c {
132-
return r.ensureSingleServerLeader(ctx, cachedStatus)
127+
return nil
133128
}
134129

135130
return errors.Reconcile()
@@ -150,170 +145,3 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
150145
// The service has been created.
151146
return errors.Reconcile()
152147
}
153-
154-
// getSingleServerLeaderID returns ids of a single server leaders.
155-
func (r *Resources) getSingleServerLeaderID(ctx context.Context) ([]string, error) {
156-
status := r.context.GetStatus()
157-
var mutex sync.Mutex
158-
var leaderIDs []string
159-
var anyError error
160-
161-
ctxCancel, cancel := context.WithCancel(ctx)
162-
defer cancel()
163-
164-
var wg sync.WaitGroup
165-
for _, m := range status.Members.Single {
166-
wg.Add(1)
167-
go func(id string) {
168-
defer wg.Done()
169-
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
170-
c, err := r.context.GetMembersState().GetMemberClient(id)
171-
if err != nil {
172-
return err
173-
}
174-
175-
if available, err := arangod.IsServerAvailable(ctxChild, c); err != nil {
176-
return err
177-
} else if !available {
178-
return errors.New("not available")
179-
}
180-
181-
mutex.Lock()
182-
leaderIDs = append(leaderIDs, id)
183-
mutex.Unlock()
184-
return nil
185-
})
186-
187-
if err != nil {
188-
mutex.Lock()
189-
anyError = err
190-
mutex.Unlock()
191-
}
192-
}(m.ID)
193-
}
194-
wg.Wait()
195-
196-
if len(leaderIDs) > 0 {
197-
return leaderIDs, nil
198-
}
199-
200-
if anyError != nil {
201-
return nil, errors.WithMessagef(anyError, "unable to get a leader")
202-
}
203-
204-
return nil, errors.New("unable to get a leader")
205-
}
206-
207-
// setSingleServerLeadership adds or removes leadership label on a single server pod.
208-
func (r *Resources) ensureSingleServerLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
209-
changed := false
210-
211-
enabled := features.FailoverLeadership().Enabled()
212-
var leaderID string
213-
if enabled {
214-
leaderIDs, err := r.getSingleServerLeaderID(ctx)
215-
if err != nil {
216-
return err
217-
}
218-
219-
if len(leaderIDs) == 1 {
220-
leaderID = leaderIDs[0]
221-
} else if len(leaderIDs) > 1 {
222-
r.log.Error("multiple leaders found: %s. Blocking traffic to the deployment services", goStrings.Join(leaderIDs, ", "))
223-
}
224-
}
225-
226-
status := r.context.GetStatus()
227-
for _, m := range status.Members.Single {
228-
pod, exist := cachedStatus.Pod().V1().GetSimple(m.Pod.GetName())
229-
if !exist {
230-
continue
231-
}
232-
233-
labels := pod.GetLabels()
234-
if enabled && m.ID == leaderID {
235-
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
236-
// Single server is available, and it has a leader label.
237-
continue
238-
}
239-
240-
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
241-
} else {
242-
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
243-
// Single server is not available, and it does not have a leader label.
244-
continue
245-
}
246-
247-
delete(labels, k8sutil.LabelKeyArangoLeader)
248-
}
249-
250-
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
251-
if err != nil {
252-
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.Pod.GetName())
253-
}
254-
changed = true
255-
}
256-
257-
if changed {
258-
return errors.Reconcile()
259-
}
260-
261-
return r.ensureSingleServerLeaderServices(ctx, cachedStatus)
262-
}
263-
264-
// ensureSingleServerLeaderServices adds a leadership label to deployment service and external deployment service.
265-
func (r *Resources) ensureSingleServerLeaderServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
266-
// Add a leadership label to deployment service and external deployment service.
267-
deploymentName := r.context.GetAPIObject().GetName()
268-
changed := false
269-
services := []string{
270-
k8sutil.CreateDatabaseClientServiceName(deploymentName),
271-
k8sutil.CreateDatabaseExternalAccessServiceName(deploymentName),
272-
}
273-
274-
enabled := features.FailoverLeadership().Enabled()
275-
for _, svcName := range services {
276-
svc, exists := cachedStatus.Service().V1().GetSimple(svcName)
277-
if !exists {
278-
// It will be created later with a leadership label.
279-
continue
280-
}
281-
selector := svc.Spec.Selector
282-
if enabled {
283-
if v, ok := selector[k8sutil.LabelKeyArangoLeader]; ok && v == "true" {
284-
// It is already OK.
285-
continue
286-
}
287-
288-
selector = addLabel(selector, k8sutil.LabelKeyArangoLeader, "true")
289-
} else {
290-
if _, ok := selector[k8sutil.LabelKeyArangoLeader]; !ok {
291-
// Service does not have a leader label, and it should not have.
292-
continue
293-
}
294-
295-
delete(selector, k8sutil.LabelKeyArangoLeader)
296-
}
297-
298-
parser := patch.Patch([]patch.Item{patch.ItemReplace(patch.NewPath("spec", "selector"), selector)})
299-
data, err := parser.Marshal()
300-
if err != nil {
301-
return errors.WithMessagef(err, "unable to marshal labels for service %s", svcName)
302-
}
303-
304-
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
305-
_, err := cachedStatus.ServicesModInterface().V1().Patch(ctxChild, svcName, types.JSONPatchType, data, meta.PatchOptions{})
306-
return err
307-
})
308-
if err != nil {
309-
return errors.WithMessagef(err, "unable to patch labels for service %s", svcName)
310-
}
311-
changed = true
312-
}
313-
314-
if changed {
315-
return errors.Reconcile()
316-
}
317-
318-
return nil
319-
}

pkg/util/k8sutil/services.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func CreateExporterClientServiceName(deploymentName string) string {
7272

7373
// CreateAgentLeaderServiceName returns the name of the service used to access a leader agent.
7474
func CreateAgentLeaderServiceName(deploymentName string) string {
75-
return deploymentName + "-agent"
75+
return deploymentName + "-agent-leader"
7676
}
7777

7878
// CreateExporterService

0 commit comments

Comments
 (0)