diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 94a16110029..33575fec75b 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -2960,8 +2960,9 @@ impl CircuitThread { } else { debug!("circuit thread: calling 'circuit.transaction'"); self.controller.increment_transaction_number(); - // FIXME: we're using "span" for both step() (above) and transaction() (here). SamplySpan::new("step") + .with_category("Step") + .with_tooltip(|| format!("step {}", self.step)) .in_scope(|| self.circuit.transaction()) .unwrap_or_else(|e| self.controller.error(Arc::new(e.into()), None)); debug!("circuit thread: 'circuit.transaction' returned"); diff --git a/crates/adapters/src/controller/stats.rs b/crates/adapters/src/controller/stats.rs index c5adb94de30..8fc05759bbd 100644 --- a/crates/adapters/src/controller/stats.rs +++ b/crates/adapters/src/controller/stats.rs @@ -46,7 +46,7 @@ use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD}; use chrono::{DateTime, Utc}; use cpu_time::ProcessTime; use crossbeam::sync::Unparker; -use dbsp::utils::process_rss_bytes; +use dbsp::{samply::SamplyEvent, utils::process_rss_bytes}; use feldera_adapterlib::{ errors::journal::ControllerError, format::{BufferSize, ParseError}, @@ -69,6 +69,7 @@ use feldera_types::{ }; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; +use size_of::HumanBytes; use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Display, @@ -2108,6 +2109,16 @@ impl InputEndpointStatus { ) { let num_records = step_results.amt.records as u64; let num_bytes = step_results.amt.bytes as u64; + SamplyEvent::new("input") + .with_category("Step") + .with_tooltip(|| { + format!( + "{} submitted {num_records} records ({} bytes) for step {total_initiated_steps}", + &self.endpoint_name, + HumanBytes::from(num_bytes) + ) + }) + .record(); *self.progress.lock().unwrap() = Some(step_results); self.metrics .buffered_records diff --git a/crates/dbsp/src/samply.rs b/crates/dbsp/src/samply.rs index 3a6d21e2923..88d0632a176 100644 --- a/crates/dbsp/src/samply.rs +++ b/crates/dbsp/src/samply.rs @@ -97,16 +97,6 @@ impl<'de> Deserialize<'de> for Timestamp { } } -/// A marker span for the [samply] profiler. -/// -/// Constructing and dropping a [SamplySpan], when marker spans are being -/// captured with [Markers::capture], records the start and end times of the -/// [SamplySpan] along with a name, category, and tooltip. -/// -/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply -/// [module documentation]: crate::samply -pub struct SamplySpan(Option); - impl Debug for SamplySpan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "SamplySpan")?; @@ -126,10 +116,24 @@ struct SpanInner { impl SpanInner { #[cold] - fn record(self) { + fn new(name: &'static str) -> Self { + Self { + start: Timestamp::now(), + category: "Other", + name, + tooltip: String::new(), + } + } + + #[cold] + fn record(self, is_span: bool) { let marker = Marker { start: self.start, - end: Timestamp::now(), + end: if is_span { + Timestamp::now() + } else { + self.start + }, category: self.category, name: self.name, tooltip: self.tooltip, @@ -138,6 +142,18 @@ impl SpanInner { } } +/// A marker span for the [samply] profiler. +/// +/// Constructing and dropping a [SamplySpan], when marker spans are being +/// captured with [Markers::capture], records the start and end times of the +/// [SamplySpan] along with a name, category, and tooltip. +/// +/// `SamplySpan` is for timespans. Use [SamplyEvent] for point-in-time events. +/// +/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply +/// [module documentation]: crate::samply +pub struct SamplySpan(Option); + impl SamplySpan { /// Constructs a new [SamplySpan] with the given name. When the constructed /// span is dropped, it is automatically recorded. @@ -152,16 +168,7 @@ impl SamplySpan { /// horizontal timeline (unless that would cause overlaps). #[must_use] pub fn new(name: &'static str) -> Self { - if ENABLE_MARKERS.load(Ordering::Acquire) { - Self(Some(SpanInner { - start: Timestamp::now(), - category: "Other", - name, - tooltip: String::new(), - })) - } else { - Self(None) - } + Self(markers_enabled().then(|| SpanInner::new(name))) } /// Adds `category` to this span. @@ -224,7 +231,72 @@ impl SamplySpan { impl Drop for SamplySpan { fn drop(&mut self) { if let Some(inner) = self.0.take() { - inner.record() + inner.record(true) + } + } +} + +/// A marker event for the [samply] profiler. +/// +/// When [Markers::capture] is running, use this type to record an event along +/// with a name, category, and tooltip. +/// +/// An event happens at a point in time; use [SamplySpan] to record a timespan. +/// +/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply +/// [module documentation]: crate::samply +pub struct SamplyEvent(Option); + +impl SamplyEvent { + /// Constructs a new [SamplyEvent] with the given name. + /// + /// [SamplyEvent] does nothing when markers are not being captured. + /// + /// The name should ordinarily be a short static string indicating what the + /// event did. The Firefox Profiler's marker chart view shows all the + /// events in a thread with the same name and category on a single + /// horizontal timeline (unless that would cause overlaps). + #[must_use] + pub fn new(name: &'static str) -> Self { + Self(markers_enabled().then(|| SpanInner::new(name))) + } + + /// Adds `category` to this event. + /// + /// The Firefox Profiler's marker chart view groups the markers in each + /// category and labels them with the category name. + /// + /// The default category is "Other". + #[must_use] + pub fn with_category(mut self, category: &'static str) -> Self { + if let Some(inner) = &mut self.0 { + inner.category = category; + } + self + } + + /// Evaluates `tooltip` and adds it to this event. + /// + /// The Firefox Profiler shows the given tooltip in the marker chart + /// timeline (often truncated) and on hover, and as "details" in the marker + /// table view. + /// + /// `tooltip` is only evaluated if samply is running. + #[must_use] + pub fn with_tooltip(mut self, tooltip: F) -> Self + where + F: FnOnce() -> String, + { + if let Some(inner) = &mut self.0 { + inner.tooltip = tooltip(); + } + self + } + + /// Records the event. + pub fn record(self) { + if let Some(inner) = self.0 { + inner.record(false); } } } @@ -474,6 +546,10 @@ impl Markers { /// Whether markers are currently being captured. static ENABLE_MARKERS: AtomicBool = AtomicBool::new(false); +fn markers_enabled() -> bool { + ENABLE_MARKERS.load(Ordering::Acquire) +} + /// A single marker as captured. struct Marker { /// Start time.