diff options
Diffstat (limited to 'pkg/metric')
-rw-r--r-- | pkg/metric/BUILD | 8 | ||||
-rw-r--r-- | pkg/metric/metric.go | 147 | ||||
-rw-r--r-- | pkg/metric/metric.proto | 14 | ||||
-rw-r--r-- | pkg/metric/metric_test.go | 145 |
4 files changed, 301 insertions, 13 deletions
diff --git a/pkg/metric/BUILD b/pkg/metric/BUILD index 0a6a5d215..c08792751 100644 --- a/pkg/metric/BUILD +++ b/pkg/metric/BUILD @@ -4,13 +4,16 @@ package(licenses = ["notice"]) go_library( name = "metric", - srcs = ["metric.go"], + srcs = [ + "metric.go", + ], visibility = ["//:sandbox"], deps = [ ":metric_go_proto", "//pkg/eventchannel", "//pkg/log", "//pkg/sync", + "@org_golang_google_protobuf//types/known/timestamppb", ], ) @@ -18,6 +21,9 @@ proto_library( name = "metric", srcs = ["metric.proto"], visibility = ["//:sandbox"], + deps = [ + "@com_google_protobuf//:timestamp_proto", + ], ) go_test( diff --git a/pkg/metric/metric.go b/pkg/metric/metric.go index 4829ae7ce..ac38ec894 100644 --- a/pkg/metric/metric.go +++ b/pkg/metric/metric.go @@ -20,7 +20,9 @@ import ( "fmt" "sort" "sync/atomic" + "time" + "google.golang.org/protobuf/types/known/timestamppb" "gvisor.dev/gvisor/pkg/eventchannel" "gvisor.dev/gvisor/pkg/log" pb "gvisor.dev/gvisor/pkg/metric/metric_go_proto" @@ -54,6 +56,27 @@ var ( }) ) +// InitStage is the name of a Sentry initialization stage. +type InitStage string + +// List of all Sentry initialization stages. +var ( + InitRestoreConfig InitStage = "restore_config" + InitExecConfig InitStage = "exec_config" + InitRestore InitStage = "restore" + InitCreateProcess InitStage = "create_process" + InitTaskStart InitStage = "task_start" + + // allStages is the list of allowed stages. + allStages = []InitStage{ + InitRestoreConfig, + InitExecConfig, + InitRestore, + InitCreateProcess, + InitTaskStart, + } +) + // Uint64Metric encapsulates a uint64 that represents some kind of metric to be // monitored. We currently support metrics with at most one field. // @@ -98,6 +121,10 @@ func Initialize() error { for _, v := range allMetrics.m { m.Metrics = append(m.Metrics, v.metadata) } + m.Stages = make([]string, 0, len(allStages)) + for _, s := range allStages { + m.Stages = append(m.Stages, string(s)) + } if err := eventchannel.Emit(&m); err != nil { return fmt.Errorf("unable to emit metric initialize event: %w", err) } @@ -287,34 +314,66 @@ func (m *Uint64Metric) IncrementBy(v uint64, fieldValues ...string) { } } -// metricSet holds named metrics. +// stageTiming contains timing data for an initialization stage. +type stageTiming struct { + stage InitStage + started time.Time + // ended is the zero time when the stage has not ended yet. + ended time.Time +} + +// inProgress returns whether this stage hasn't ended yet. +func (s stageTiming) inProgress() bool { + return !s.started.IsZero() && s.ended.IsZero() +} + +// metricSet holds metric data. type metricSet struct { + // Map of metrics. m map[string]customUint64Metric + + // mu protects the fields below. + mu sync.RWMutex + + // Information about the stages reached by the Sentry. Only appended to, so + // reading a shallow copy of the slice header concurrently is safe. + finished []stageTiming + + // The current stage in progress. + currentStage stageTiming } // makeMetricSet returns a new metricSet. func makeMetricSet() metricSet { return metricSet{ - m: make(map[string]customUint64Metric), + m: make(map[string]customUint64Metric), + finished: make([]stageTiming, 0, len(allStages)), } } // Values returns a snapshot of all values in m. func (m *metricSet) Values() metricValues { - vals := make(metricValues) + m.mu.Lock() + stages := m.finished[:] + m.mu.Unlock() + + vals := metricValues{ + m: make(map[string]interface{}, len(m.m)), + stages: stages, + } for k, v := range m.m { fields := v.metadata.GetFields() switch len(fields) { case 0: - vals[k] = v.value() + vals.m[k] = v.value() case 1: values := fields[0].GetAllowedValues() fieldsMap := make(map[string]uint64) for _, fieldValue := range values { fieldsMap[fieldValue] = v.value(fieldValue) } - vals[k] = fieldsMap + vals.m[k] = fieldsMap default: panic(fmt.Sprintf("Unsupported number of metric fields: %d", len(fields))) } @@ -322,10 +381,16 @@ func (m *metricSet) Values() metricValues { return vals } -// metricValues contains a copy of the values of all metrics. It is a map -// with key as metric name and value can be either uint64 or map[string]uint64 -// to support metrics with one field. -type metricValues map[string]interface{} +// metricValues contains a copy of the values of all metrics. +type metricValues struct { + // m is a map with key as metric name and value can be either uint64 or + // map[string]uint64 to support metrics with one field. + m map[string]interface{} + + // Information on when initialization stages were reached. Does not include + // the currently-ongoing stage, if any. + stages []stageTiming +} var ( // emitMu protects metricsAtLastEmit and ensures that all emitted @@ -354,8 +419,8 @@ func EmitMetricUpdate() { m := pb.MetricUpdate{} // On the first call metricsAtLastEmit will be empty. Include all // metrics then. - for k, v := range snapshot { - prev, ok := metricsAtLastEmit[k] + for k, v := range snapshot.m { + prev, ok := metricsAtLastEmit.m[k] switch t := v.(type) { case uint64: // Metric exists and value did not change. @@ -386,8 +451,23 @@ func EmitMetricUpdate() { } } + for s := len(metricsAtLastEmit.stages); s < len(snapshot.stages); s++ { + newStage := snapshot.stages[s] + m.StageTiming = append(m.StageTiming, &pb.StageTiming{ + Stage: string(newStage.stage), + Started: ×tamppb.Timestamp{ + Seconds: newStage.started.Unix(), + Nanos: int32(newStage.started.Nanosecond()), + }, + Ended: ×tamppb.Timestamp{ + Seconds: newStage.ended.Unix(), + Nanos: int32(newStage.ended.Nanosecond()), + }, + }) + } + metricsAtLastEmit = snapshot - if len(m.Metrics) == 0 { + if len(m.Metrics) == 0 && len(m.StageTiming) == 0 { return } @@ -399,9 +479,52 @@ func EmitMetricUpdate() { for _, metric := range m.Metrics { log.Debugf("%s: %+v", metric.Name, metric.Value) } + for _, stage := range m.StageTiming { + duration := time.Duration(stage.Ended.Seconds-stage.Started.Seconds)*time.Second + time.Duration(stage.Ended.Nanos-stage.Started.Nanos)*time.Nanosecond + log.Debugf("Stage %s took %v", stage.GetStage(), duration) + } } if err := eventchannel.Emit(&m); err != nil { log.Warningf("Unable to emit metrics: %s", err) } } + +// StartStage should be called when an initialization stage is started. +// It returns a function that must be called to indicate that the stage ended. +// Alternatively, future calls to StartStage will implicitly indicate that the +// previous stage ended. +// Stage information will be emitted in the next call to EmitMetricUpdate after +// a stage has ended. +// +// This function may (and is expected to) be called prior to final +// initialization of this metric library, as it has to capture early stages +// of Sentry initialization. +func StartStage(stage InitStage) func() { + now := time.Now() + allMetrics.mu.Lock() + defer allMetrics.mu.Unlock() + if allMetrics.currentStage.inProgress() { + endStage(now) + } + allMetrics.currentStage.stage = stage + allMetrics.currentStage.started = now + return func() { + now := time.Now() + allMetrics.mu.Lock() + defer allMetrics.mu.Unlock() + // The current stage may have been ended by another call to StartStage, so + // double-check prior to clearing the current stage. + if allMetrics.currentStage.inProgress() && allMetrics.currentStage.stage == stage { + endStage(now) + } + } +} + +// endStage marks allMetrics.currentStage as ended, adding it to the list of +// finished stages. It assumes allMetrics.mu is locked. +func endStage(when time.Time) { + allMetrics.currentStage.ended = when + allMetrics.finished = append(allMetrics.finished, allMetrics.currentStage) + allMetrics.currentStage = stageTiming{} +} diff --git a/pkg/metric/metric.proto b/pkg/metric/metric.proto index 53c8b4b50..d466b6904 100644 --- a/pkg/metric/metric.proto +++ b/pkg/metric/metric.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package gvisor; +import "google/protobuf/timestamp.proto"; + // MetricMetadata contains all of the metadata describing a single metric. message MetricMetadata { // name is the unique name of the metric, usually in a "directory" format @@ -63,6 +65,7 @@ message MetricMetadata { // future MetricUpdates. message MetricRegistration { repeated MetricMetadata metrics = 1; + repeated string stages = 2; } // MetricValue the value of a metric at a single point in time. @@ -79,9 +82,20 @@ message MetricValue { repeated string field_values = 4; } +// StageTiming represents a new stage that's been reached by the Sentry. +message StageTiming { + string stage = 1; + google.protobuf.Timestamp started = 2; + google.protobuf.Timestamp ended = 3; +} + // MetricUpdate contains new values for multiple distinct metrics. // // Metrics whose values have not changed are not included. message MetricUpdate { repeated MetricValue metrics = 1; + // Timing information of initialization stages reached since last update. + // The first MetricUpdate will include multiple entries, since metric + // initialization happens relatively late in the Sentry startup process. + repeated StageTiming stage_timing = 2; } diff --git a/pkg/metric/metric_test.go b/pkg/metric/metric_test.go index 1b4a9e73a..0654bdf07 100644 --- a/pkg/metric/metric_test.go +++ b/pkg/metric/metric_test.go @@ -16,6 +16,7 @@ package metric import ( "testing" + "time" "google.golang.org/protobuf/proto" "gvisor.dev/gvisor/pkg/eventchannel" @@ -352,3 +353,147 @@ func TestEmitMetricUpdateWithFields(t *testing.T) { t.Errorf("Field value weird2 not found: %+v", emitter) } } + +func TestMetricUpdateStageTiming(t *testing.T) { + defer reset() + + expectedTimings := map[InitStage]struct{ min, max time.Duration }{} + measureStage := func(stage InitStage, body func()) { + stageStarted := time.Now() + endStage := StartStage(stage) + bodyStarted := time.Now() + body() + bodyEnded := time.Now() + endStage() + stageEnded := time.Now() + + expectedTimings[stage] = struct{ min, max time.Duration }{ + min: bodyEnded.Sub(bodyStarted), + max: stageEnded.Sub(stageStarted), + } + } + checkStage := func(got *pb.StageTiming, want InitStage) { + if InitStage(got.GetStage()) != want { + t.Errorf("%v: got stage %q expected %q", got, got.GetStage(), want) + } + timingBounds, found := expectedTimings[want] + if !found { + t.Fatalf("invalid init stage name %q", want) + } + started := got.Started.AsTime() + ended := got.Ended.AsTime() + duration := ended.Sub(started) + if duration < timingBounds.min { + t.Errorf("stage %v: lasted %v, expected at least %v", want, duration, timingBounds.min) + } else if duration > timingBounds.max { + t.Errorf("stage %v: lasted %v, expected no more than %v", want, duration, timingBounds.max) + } + } + + // Test that it's legit to go through stages before metric registration. + measureStage("before_first_update_1", func() { + time.Sleep(100 * time.Millisecond) + }) + measureStage("before_first_update_2", func() { + time.Sleep(100 * time.Millisecond) + }) + + fooMetric, err := NewUint64Metric("/foo", false, pb.MetricMetadata_UNITS_NONE, fooDescription) + if err != nil { + t.Fatalf("Cannot register /foo: %v", err) + } + emitter.Reset() + Initialize() + EmitMetricUpdate() + + // We should have gotten the metric registration and the first MetricUpdate. + if len(emitter) != 2 { + t.Fatalf("emitter has %d messages (%v), expected %d", len(emitter), emitter, 2) + } + + if registration, ok := emitter[0].(*pb.MetricRegistration); !ok { + t.Errorf("first message is not MetricRegistration: %T / %v", emitter[0], emitter[0]) + } else if len(registration.Stages) != len(allStages) { + t.Errorf("MetricRegistration has %d stages (%v), expected %d (%v)", len(registration.Stages), registration.Stages, len(allStages), allStages) + } else { + for i := 0; i < len(allStages); i++ { + if InitStage(registration.Stages[i]) != allStages[i] { + t.Errorf("MetricRegistration.Stages[%d]: got %q want %q", i, registration.Stages[i], allStages[i]) + } + } + } + + if firstUpdate, ok := emitter[1].(*pb.MetricUpdate); !ok { + t.Errorf("second message is not MetricUpdate: %T / %v", emitter[1], emitter[1]) + } else if len(firstUpdate.StageTiming) != 2 { + t.Errorf("MetricUpdate has %d stage timings (%v), expected %d", len(firstUpdate.StageTiming), firstUpdate.StageTiming, 2) + } else { + checkStage(firstUpdate.StageTiming[0], "before_first_update_1") + checkStage(firstUpdate.StageTiming[1], "before_first_update_2") + } + + // Ensure re-emitting doesn't cause another event to be sent. + emitter.Reset() + EmitMetricUpdate() + if len(emitter) != 0 { + t.Fatalf("EmitMetricUpdate emitted %d events want %d", len(emitter), 0) + } + + // Generate monitoring data, we should get an event with no stages. + fooMetric.Increment() + emitter.Reset() + EmitMetricUpdate() + if len(emitter) != 1 { + t.Fatalf("EmitMetricUpdate emitted %d events want %d", len(emitter), 1) + } else if update, ok := emitter[0].(*pb.MetricUpdate); !ok { + t.Errorf("message is not MetricUpdate: %T / %v", emitter[1], emitter[1]) + } else if len(update.StageTiming) != 0 { + t.Errorf("unexpected stage timing information: %v", update.StageTiming) + } + + // Now generate new stages. + measureStage("foo_stage_1", func() { + time.Sleep(100 * time.Millisecond) + }) + measureStage("foo_stage_2", func() { + time.Sleep(100 * time.Millisecond) + }) + emitter.Reset() + EmitMetricUpdate() + if len(emitter) != 1 { + t.Fatalf("EmitMetricUpdate emitted %d events want %d", len(emitter), 1) + } else if update, ok := emitter[0].(*pb.MetricUpdate); !ok { + t.Errorf("message is not MetricUpdate: %T / %v", emitter[1], emitter[1]) + } else if len(update.Metrics) != 0 { + t.Errorf("MetricUpdate has %d metric value changes (%v), expected %d", len(update.Metrics), update.Metrics, 0) + } else if len(update.StageTiming) != 2 { + t.Errorf("MetricUpdate has %d stages (%v), expected %d", len(update.StageTiming), update.StageTiming, 2) + } else { + checkStage(update.StageTiming[0], "foo_stage_1") + checkStage(update.StageTiming[1], "foo_stage_2") + } + + // Now try generating data for both metrics and stages. + fooMetric.Increment() + measureStage("last_stage_1", func() { + time.Sleep(100 * time.Millisecond) + }) + measureStage("last_stage_2", func() { + time.Sleep(100 * time.Millisecond) + }) + fooMetric.Increment() + emitter.Reset() + EmitMetricUpdate() + if len(emitter) != 1 { + t.Fatalf("EmitMetricUpdate emitted %d events want %d", len(emitter), 1) + } else if update, ok := emitter[0].(*pb.MetricUpdate); !ok { + t.Errorf("message is not MetricUpdate: %T / %v", emitter[1], emitter[1]) + } else if len(update.Metrics) != 1 { + t.Errorf("MetricUpdate has %d metric value changes (%v), expected %d", len(update.Metrics), update.Metrics, 1) + } else if len(update.StageTiming) != 2 { + t.Errorf("MetricUpdate has %d stages (%v), expected %d", len(update.StageTiming), update.StageTiming, 2) + } else { + checkStage(update.StageTiming[0], "last_stage_1") + checkStage(update.StageTiming[1], "last_stage_2") + } +} |