@@ -84,6 +84,57 @@ func (s *instrumentedClientStream) Header() (metadata.MD, error) {
8484 return md , err
8585}
8686
87+ // PrometheusGRPCReusableStreamInstrumentation records duration of reusable streaming gRPC requests client side.
88+ func PrometheusGRPCReusableStreamInstrumentation (metric * prometheus.HistogramVec ) grpc.StreamClientInterceptor {
89+ return func (ctx context.Context , desc * grpc.StreamDesc , cc * grpc.ClientConn , method string ,
90+ streamer grpc.Streamer , opts ... grpc.CallOption ,
91+ ) (grpc.ClientStream , error ) {
92+ stream , err := streamer (ctx , desc , cc , method , opts ... )
93+ return & instrumentedReusableClientStream {
94+ metric : metric ,
95+ method : method ,
96+ ClientStream : stream ,
97+ }, err
98+ }
99+ }
100+
101+ type instrumentedReusableClientStream struct {
102+ metric * prometheus.HistogramVec
103+ method string
104+ grpc.ClientStream
105+ }
106+
107+ func (s * instrumentedReusableClientStream ) SendMsg (m interface {}) error {
108+ start := time .Now ()
109+ err := s .ClientStream .SendMsg (m )
110+ if err != nil && err != io .EOF {
111+ s .metric .WithLabelValues (s .method , errorCode (err )).Observe (time .Since (start ).Seconds ())
112+ return err
113+ }
114+ s .metric .WithLabelValues (s .method , errorCode (nil )).Observe (time .Since (start ).Seconds ())
115+ return err
116+ }
117+
118+ func (s * instrumentedReusableClientStream ) RecvMsg (m interface {}) error {
119+ start := time .Now ()
120+ err := s .ClientStream .RecvMsg (m )
121+ if err != nil && err != io .EOF {
122+ s .metric .WithLabelValues (s .method , errorCode (err )).Observe (time .Since (start ).Seconds ())
123+ return err
124+ }
125+ s .metric .WithLabelValues (s .method , errorCode (nil )).Observe (time .Since (start ).Seconds ())
126+ return err
127+ }
128+
129+ func (s * instrumentedReusableClientStream ) Header () (metadata.MD , error ) {
130+ start := time .Now ()
131+ md , err := s .ClientStream .Header ()
132+ if err != nil {
133+ s .metric .WithLabelValues (s .method , errorCode (err )).Observe (time .Since (start ).Seconds ())
134+ }
135+ return md , err
136+ }
137+
87138func errorCode (err error ) string {
88139 respStatus := "2xx"
89140 if err != nil {
0 commit comments