Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2960,8 +2960,9 @@ impl CircuitThread {
} else {
debug!("circuit thread: calling 'circuit.transaction'");
self.controller.increment_transaction_number();
// FIXME: we're using "span" for both step() (above) and transaction() (here).
SamplySpan::new("step")
.with_category("Step")
.with_tooltip(|| format!("step {}", self.step))
.in_scope(|| self.circuit.transaction())
.unwrap_or_else(|e| self.controller.error(Arc::new(e.into()), None));
debug!("circuit thread: 'circuit.transaction' returned");
Expand Down
13 changes: 12 additions & 1 deletion crates/adapters/src/controller/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
use chrono::{DateTime, Utc};
use cpu_time::ProcessTime;
use crossbeam::sync::Unparker;
use dbsp::utils::process_rss_bytes;
use dbsp::{samply::SamplyEvent, utils::process_rss_bytes};
use feldera_adapterlib::{
errors::journal::ControllerError,
format::{BufferSize, ParseError},
Expand All @@ -69,6 +69,7 @@ use feldera_types::{
};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use size_of::HumanBytes;
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fmt::Display,
Expand Down Expand Up @@ -2108,6 +2109,16 @@ impl InputEndpointStatus {
) {
let num_records = step_results.amt.records as u64;
let num_bytes = step_results.amt.bytes as u64;
SamplyEvent::new("input")
.with_category("Step")
.with_tooltip(|| {
format!(
"{} submitted {num_records} records ({} bytes) for step {total_initiated_steps}",
&self.endpoint_name,
HumanBytes::from(num_bytes)
)
})
.record();
*self.progress.lock().unwrap() = Some(step_results);
self.metrics
.buffered_records
Expand Down
122 changes: 99 additions & 23 deletions crates/dbsp/src/samply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,6 @@ impl<'de> Deserialize<'de> for Timestamp {
}
}

/// A marker span for the [samply] profiler.
///
/// Constructing and dropping a [SamplySpan], when marker spans are being
/// captured with [Markers::capture], records the start and end times of the
/// [SamplySpan] along with a name, category, and tooltip.
///
/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
/// [module documentation]: crate::samply
pub struct SamplySpan(Option<SpanInner>);

impl Debug for SamplySpan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SamplySpan")?;
Expand All @@ -126,10 +116,24 @@ struct SpanInner {

impl SpanInner {
#[cold]
fn record(self) {
fn new(name: &'static str) -> Self {
Self {
start: Timestamp::now(),
category: "Other",
name,
tooltip: String::new(),
}
}

#[cold]
fn record(self, is_span: bool) {
let marker = Marker {
start: self.start,
end: Timestamp::now(),
end: if is_span {
Timestamp::now()
} else {
self.start
},
category: self.category,
name: self.name,
tooltip: self.tooltip,
Expand All @@ -138,6 +142,18 @@ impl SpanInner {
}
}

/// A marker span for the [samply] profiler.
///
/// Constructing and dropping a [SamplySpan], when marker spans are being
/// captured with [Markers::capture], records the start and end times of the
/// [SamplySpan] along with a name, category, and tooltip.
///
/// `SamplySpan` is for timespans. Use [SamplyEvent] for point-in-time events.
///
/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
/// [module documentation]: crate::samply
pub struct SamplySpan(Option<SpanInner>);

impl SamplySpan {
/// Constructs a new [SamplySpan] with the given name. When the constructed
/// span is dropped, it is automatically recorded.
Expand All @@ -152,16 +168,7 @@ impl SamplySpan {
/// horizontal timeline (unless that would cause overlaps).
#[must_use]
pub fn new(name: &'static str) -> Self {
if ENABLE_MARKERS.load(Ordering::Acquire) {
Self(Some(SpanInner {
start: Timestamp::now(),
category: "Other",
name,
tooltip: String::new(),
}))
} else {
Self(None)
}
Self(markers_enabled().then(|| SpanInner::new(name)))
}

/// Adds `category` to this span.
Expand Down Expand Up @@ -224,7 +231,72 @@ impl SamplySpan {
impl Drop for SamplySpan {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
inner.record()
inner.record(true)
}
}
}

/// A marker event for the [samply] profiler.
///
/// When [Markers::capture] is running, use this type to record an event along
/// with a name, category, and tooltip.
///
/// An event happens at a point in time; use [SamplySpan] to record a timespan.
///
/// [samply]: https://github.com/mstange/samply?tab=readme-ov-file#samply
/// [module documentation]: crate::samply
pub struct SamplyEvent(Option<SpanInner>);

impl SamplyEvent {
/// Constructs a new [SamplyEvent] with the given name.
///
/// [SamplyEvent] does nothing when markers are not being captured.
///
/// The name should ordinarily be a short static string indicating what the
/// event did. The Firefox Profiler's marker chart view shows all the
/// events in a thread with the same name and category on a single
/// horizontal timeline (unless that would cause overlaps).
#[must_use]
pub fn new(name: &'static str) -> Self {
Self(markers_enabled().then(|| SpanInner::new(name)))
}

/// Adds `category` to this event.
///
/// The Firefox Profiler's marker chart view groups the markers in each
/// category and labels them with the category name.
///
/// The default category is "Other".
#[must_use]
pub fn with_category(mut self, category: &'static str) -> Self {
if let Some(inner) = &mut self.0 {
inner.category = category;
}
self
}

/// Evaluates `tooltip` and adds it to this event.
///
/// The Firefox Profiler shows the given tooltip in the marker chart
/// timeline (often truncated) and on hover, and as "details" in the marker
/// table view.
///
/// `tooltip` is only evaluated if samply is running.
#[must_use]
pub fn with_tooltip<F>(mut self, tooltip: F) -> Self
where
F: FnOnce() -> String,
{
if let Some(inner) = &mut self.0 {
inner.tooltip = tooltip();
}
self
}

/// Records the event.
pub fn record(self) {
if let Some(inner) = self.0 {
inner.record(false);
}
}
}
Expand Down Expand Up @@ -474,6 +546,10 @@ impl Markers {
/// Whether markers are currently being captured.
static ENABLE_MARKERS: AtomicBool = AtomicBool::new(false);

fn markers_enabled() -> bool {
ENABLE_MARKERS.load(Ordering::Acquire)
}

/// A single marker as captured.
struct Marker {
/// Start time.
Expand Down
Loading