Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions internal/orchestrator/metrichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"strings"

"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -94,41 +93,43 @@ func (h *MetricHandler) projectionsMonitor(ctx context.Context, list *unstructur
// Add base dimensions only if they have a non-empty value
h.setDataPointBaseDimensions(dataPoint)

// Add projected dimensions for this specific group
for _, pField := range group {
// Add projected dimension only if the value is non-empty and no error occurred
if pField.error == nil && pField.value != "" {
dataPoint.AddDimension(pField.name, pField.value)
} else {
// Optionally log or handle projection errors
recordErrors = append(recordErrors, fmt.Errorf("projection error for %s: %w", pField.name, pField.error))
for _, inGroup := range group {
for _, pField := range inGroup {
// Add projected dimension only if the value is non-empty and no error occurred
if pField.error == nil && pField.value != "" {
dataPoint.AddDimension(pField.name, pField.value)
} else {
// Optionally log or handle projection errors
recordErrors = append(recordErrors, fmt.Errorf("projection error for %s: %w", pField.name, pField.error))
}
}

dataPoints = append(dataPoints, dataPoint)
}
dataPoints = append(dataPoints, dataPoint)
}

// Record all collected data points
errRecord := h.gaugeMetric.RecordMetrics(ctx, dataPoints...)
if errRecord != nil {
recordErrors = append(recordErrors, errRecord)
}
// Record all collected data points
errRecord := h.gaugeMetric.RecordMetrics(ctx, dataPoints...)
if errRecord != nil {
recordErrors = append(recordErrors, errRecord)
}

// Update result based on errors during projection or recording
if len(recordErrors) > 0 {
// Combine errors for reporting
combinedError := fmt.Errorf("errors during metric recording: %v", recordErrors)
result.Error = combinedError
result.Phase = v1alpha1.PhaseFailed
result.Reason = "RecordMetricFailed"
result.Message = fmt.Sprintf("failed to record metric value(s): %s", combinedError.Error())
} else {
result.Phase = v1alpha1.PhaseActive
result.Reason = v1alpha1.ReasonMonitoringActive
result.Message = fmt.Sprintf("metric values recorded for resource '%s'", h.metric.GvkToString())
// Observation might need adjustment depending on how results should be represented in status
result.Observation = &v1alpha1.MetricObservation{Timestamp: metav1.Now(), LatestValue: strconv.Itoa(len(list.Items))} // Report total count for now
}
// Return the result, error indicates failure in Monitor execution, not necessarily metric export failure (handled by controller)
// Update result based on errors during projection or recording
if len(recordErrors) > 0 {
// Combine errors for reporting
combinedError := fmt.Errorf("errors during metric recording: %v", recordErrors)
result.Error = combinedError
result.Phase = v1alpha1.PhaseFailed
result.Reason = "RecordMetricFailed"
result.Message = fmt.Sprintf("failed to record metric value(s): %s", combinedError.Error())
} else {
result.Phase = v1alpha1.PhaseActive
result.Reason = v1alpha1.ReasonMonitoringActive
result.Message = fmt.Sprintf("metric values recorded for resource '%s'", h.metric.GvkToString())
// Observation might need adjustment depending on how results should be represented in status
result.Observation = &v1alpha1.MetricObservation{Timestamp: metav1.Now(), LatestValue: strconv.Itoa(len(list.Items))} // Report total count for now
}
// Return the result, error indicates failure in Monitor execution, not necessarily metric export failure (handled by controller)
}
return result, nil
}

Expand Down Expand Up @@ -158,27 +159,31 @@ func (e *projectedField) GetID() string {
return fmt.Sprintf("%s: %s", e.name, e.value)
}

func (h *MetricHandler) extractProjectionGroupsFrom(list *unstructured.UnstructuredList) map[string][]projectedField {
// note: for now we only allow one projection, so we can use the first one
// the reason for this is that if we have multiple projections, we need to create a cartesian product of all projections
// this is to be done at a later time
var collection []projectedField
func (h *MetricHandler) extractProjectionGroupsFrom(list *unstructured.UnstructuredList) map[string][][]projectedField {
collection := make([][]projectedField, 0, len(list.Items))

for _, obj := range list.Items {

projection := lo.FirstOr(h.metric.Spec.Projections, v1alpha1.Projection{})

if projection.Name != "" && projection.FieldPath != "" {
name := projection.Name
value, found, err := nestedPrimitiveValue(obj, projection.FieldPath)
collection = append(collection, projectedField{name: name, value: value, found: found, error: err})
var fields []projectedField
for _, projection := range h.metric.Spec.Projections {
if projection.Name != "" && projection.FieldPath != "" {
name := projection.Name
value, found, err := nestedPrimitiveValue(obj, projection.FieldPath)
fields = append(fields, projectedField{name: name, value: value, found: found, error: err})
}
}
collection = append(collection, fields)
}

// group by the extracted values for the dimension .e.g. device: iPhone, device: Android and count them later
groups := lo.GroupBy(collection, func(field projectedField) string {
return field.GetID()
})
// Group by the combination of all projected values
groups := make(map[string][][]projectedField)
for _, fields := range collection {
var keyParts []string
for _, f := range fields {
keyParts = append(keyParts, fmt.Sprintf("%s: %s", f.name, f.value))
}
key := strings.Join(keyParts, ", ")
groups[key] = append(groups[key], fields)
}

return groups
}
Expand Down