diff --git a/internal/orchestrator/metrichandler.go b/internal/orchestrator/metrichandler.go index 20784bc..02fe72d 100644 --- a/internal/orchestrator/metrichandler.go +++ b/internal/orchestrator/metrichandler.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" - "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -94,41 +93,43 @@ func (h *MetricHandler) projectionsMonitor(ctx context.Context, list *unstructur // Add base dimensions only if they have a non-empty value h.setDataPointBaseDimensions(dataPoint) - // Add projected dimensions for this specific group - for _, pField := range group { - // Add projected dimension only if the value is non-empty and no error occurred - if pField.error == nil && pField.value != "" { - dataPoint.AddDimension(pField.name, pField.value) - } else { - // Optionally log or handle projection errors - recordErrors = append(recordErrors, fmt.Errorf("projection error for %s: %w", pField.name, pField.error)) + for _, inGroup := range group { + for _, pField := range inGroup { + // Add projected dimension only if the value is non-empty and no error occurred + if pField.error == nil && pField.value != "" { + dataPoint.AddDimension(pField.name, pField.value) + } else { + // Optionally log or handle projection errors + recordErrors = append(recordErrors, fmt.Errorf("projection error for %s: %w", pField.name, pField.error)) + } } + + dataPoints = append(dataPoints, dataPoint) } - dataPoints = append(dataPoints, dataPoint) - } - // Record all collected data points - errRecord := h.gaugeMetric.RecordMetrics(ctx, dataPoints...) - if errRecord != nil { - recordErrors = append(recordErrors, errRecord) - } + // Record all collected data points + errRecord := h.gaugeMetric.RecordMetrics(ctx, dataPoints...) + if errRecord != nil { + recordErrors = append(recordErrors, errRecord) + } - // Update result based on errors during projection or recording - if len(recordErrors) > 0 { - // Combine errors for reporting - combinedError := fmt.Errorf("errors during metric recording: %v", recordErrors) - result.Error = combinedError - result.Phase = v1alpha1.PhaseFailed - result.Reason = "RecordMetricFailed" - result.Message = fmt.Sprintf("failed to record metric value(s): %s", combinedError.Error()) - } else { - result.Phase = v1alpha1.PhaseActive - result.Reason = v1alpha1.ReasonMonitoringActive - result.Message = fmt.Sprintf("metric values recorded for resource '%s'", h.metric.GvkToString()) - // Observation might need adjustment depending on how results should be represented in status - result.Observation = &v1alpha1.MetricObservation{Timestamp: metav1.Now(), LatestValue: strconv.Itoa(len(list.Items))} // Report total count for now - } - // Return the result, error indicates failure in Monitor execution, not necessarily metric export failure (handled by controller) + // Update result based on errors during projection or recording + if len(recordErrors) > 0 { + // Combine errors for reporting + combinedError := fmt.Errorf("errors during metric recording: %v", recordErrors) + result.Error = combinedError + result.Phase = v1alpha1.PhaseFailed + result.Reason = "RecordMetricFailed" + result.Message = fmt.Sprintf("failed to record metric value(s): %s", combinedError.Error()) + } else { + result.Phase = v1alpha1.PhaseActive + result.Reason = v1alpha1.ReasonMonitoringActive + result.Message = fmt.Sprintf("metric values recorded for resource '%s'", h.metric.GvkToString()) + // Observation might need adjustment depending on how results should be represented in status + result.Observation = &v1alpha1.MetricObservation{Timestamp: metav1.Now(), LatestValue: strconv.Itoa(len(list.Items))} // Report total count for now + } + // Return the result, error indicates failure in Monitor execution, not necessarily metric export failure (handled by controller) + } return result, nil } @@ -158,27 +159,31 @@ func (e *projectedField) GetID() string { return fmt.Sprintf("%s: %s", e.name, e.value) } -func (h *MetricHandler) extractProjectionGroupsFrom(list *unstructured.UnstructuredList) map[string][]projectedField { - // note: for now we only allow one projection, so we can use the first one - // the reason for this is that if we have multiple projections, we need to create a cartesian product of all projections - // this is to be done at a later time - var collection []projectedField +func (h *MetricHandler) extractProjectionGroupsFrom(list *unstructured.UnstructuredList) map[string][][]projectedField { + collection := make([][]projectedField, 0, len(list.Items)) for _, obj := range list.Items { - - projection := lo.FirstOr(h.metric.Spec.Projections, v1alpha1.Projection{}) - - if projection.Name != "" && projection.FieldPath != "" { - name := projection.Name - value, found, err := nestedPrimitiveValue(obj, projection.FieldPath) - collection = append(collection, projectedField{name: name, value: value, found: found, error: err}) + var fields []projectedField + for _, projection := range h.metric.Spec.Projections { + if projection.Name != "" && projection.FieldPath != "" { + name := projection.Name + value, found, err := nestedPrimitiveValue(obj, projection.FieldPath) + fields = append(fields, projectedField{name: name, value: value, found: found, error: err}) + } } + collection = append(collection, fields) } - // group by the extracted values for the dimension .e.g. device: iPhone, device: Android and count them later - groups := lo.GroupBy(collection, func(field projectedField) string { - return field.GetID() - }) + // Group by the combination of all projected values + groups := make(map[string][][]projectedField) + for _, fields := range collection { + var keyParts []string + for _, f := range fields { + keyParts = append(keyParts, fmt.Sprintf("%s: %s", f.name, f.value)) + } + key := strings.Join(keyParts, ", ") + groups[key] = append(groups[key], fields) + } return groups }