diff --git a/apps/webapp/app/assets/icons/SandboxesIcon.tsx b/apps/webapp/app/assets/icons/SandboxesIcon.tsx new file mode 100644 index 0000000000..8c9d6667cf --- /dev/null +++ b/apps/webapp/app/assets/icons/SandboxesIcon.tsx @@ -0,0 +1,10 @@ +export function SandboxesIcon({ className }: { className?: string }) { + return ( + + + + ); +} diff --git a/apps/webapp/app/components/navigation/SideMenu.tsx b/apps/webapp/app/components/navigation/SideMenu.tsx index ba31b0ceaa..e9246b4fef 100644 --- a/apps/webapp/app/components/navigation/SideMenu.tsx +++ b/apps/webapp/app/components/navigation/SideMenu.tsx @@ -26,6 +26,7 @@ import simplur from "simplur"; import { BranchEnvironmentIconSmall } from "~/assets/icons/EnvironmentIcons"; import { ListCheckedIcon } from "~/assets/icons/ListCheckedIcon"; import { RunsIconExtraSmall } from "~/assets/icons/RunsIcon"; +import { SandboxesIcon } from "~/assets/icons/SandboxesIcon"; import { TaskIconSmall } from "~/assets/icons/TaskIcon"; import { WaitpointTokenIcon } from "~/assets/icons/WaitpointTokenIcon"; import { Avatar } from "~/components/primitives/Avatar"; @@ -50,6 +51,7 @@ import { organizationSettingsPath, organizationTeamPath, regionsPath, + sandboxesPath, v3ApiKeysPath, v3BatchesPath, v3BillingPath, @@ -236,6 +238,13 @@ export function SideMenu({ to={v3BatchesPath(organization, project, environment)} data-action="batches" /> + + } + /> - - } - /> - - + + + + ); +} + +export function SandboxStatusLabel({ status }: { status: SandboxStatusType }) { + return {sandboxStatusTitle(status)}; +} + +export function SandboxStatusIcon({ + status, + className, +}: { + status: SandboxStatusType; + className: string; +}) { + switch (status) { + case "PENDING": + return ; + case "DEPLOYING": + return ; + case "DEPLOYED": + return ; + case "CANCELED": + return ; + case "FAILED": + return ; + default: { + assertNever(status); + } + } +} + +export function sandboxStatusClassNameColor(status: SandboxStatusType): string { + switch (status) { + case "PENDING": + return "text-charcoal-500"; + case "DEPLOYING": + return "text-pending"; + case "DEPLOYED": + return "text-success"; + case "CANCELED": + return "text-charcoal-500"; + case "FAILED": + return "text-error"; + default: { + assertNever(status); + } + } +} + +export function sandboxStatusTitle(status: SandboxStatusType): string { + switch (status) { + case "PENDING": + return "Queued…"; + case "DEPLOYING": + return "Deploying…"; + case "DEPLOYED": + return "Deployed"; + case "CANCELED": + return "Canceled"; + case "FAILED": + return "Failed"; + default: { + assertNever(status); + } + } +} + +export const sandboxStatuses: SandboxStatusType[] = [ + "PENDING", + "DEPLOYING", + "DEPLOYED", + "FAILED", + "CANCELED", +]; + +export function sandboxStatusDescription(status: SandboxStatusType): string { + switch (status) { + case "PENDING": + return "The sandbox is queued and waiting to be deployed."; + case "DEPLOYING": + return "The sandbox environment is being built and deployed."; + case "DEPLOYED": + return "The sandbox environment is ready to use."; + case "CANCELED": + return "The sandbox deployment was manually canceled."; + case "FAILED": + return "The sandbox deployment encountered an error and could not complete."; + default: { + assertNever(status); + } + } +} diff --git a/apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx b/apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx index 8d81e2f36c..0cb14dfa6a 100644 --- a/apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx +++ b/apps/webapp/app/components/runs/v3/TaskTriggerSource.tsx @@ -1,5 +1,6 @@ import { ClockIcon } from "@heroicons/react/20/solid"; import type { TaskTriggerSource } from "@trigger.dev/database"; +import { SandboxesIcon } from "~/assets/icons/SandboxesIcon"; import { TaskIconSmall } from "~/assets/icons/TaskIcon"; import { cn } from "~/utils/cn"; @@ -19,6 +20,9 @@ export function TaskTriggerSourceIcon({ ); } + case "SANDBOX": { + return ; + } } } @@ -30,5 +34,8 @@ export function taskTriggerSourceDescription(source: TaskTriggerSource) { case "SCHEDULED": { return "Scheduled task"; } + case "SANDBOX": { + return "Sandbox task"; + } } } diff --git a/apps/webapp/app/presenters/v3/SandboxListPresenter.server.ts b/apps/webapp/app/presenters/v3/SandboxListPresenter.server.ts new file mode 100644 index 0000000000..12ca46c0e2 --- /dev/null +++ b/apps/webapp/app/presenters/v3/SandboxListPresenter.server.ts @@ -0,0 +1,153 @@ +import { + type PrismaClientOrTransaction, + type SandboxEnvironment, + type SandboxStatus, + type SandboxType, +} from "@trigger.dev/database"; +import { $replica } from "~/db.server"; +import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server"; + +export type SandboxListItem = { + id: string; + friendlyId: string; + deduplicationKey: string; + type: SandboxType; + status: SandboxStatus; + runtime: string; + packages: string[]; + systemPackages: string[]; + imageReference: string | null; + imageVersion: string | null; + contentHash: string | null; + createdAt: Date; + updatedAt: Date; + taskCount: number; + taskSlugs: string[]; + tasks: Array<{ + slug: string; + filePath: string; + }>; +}; + +export class SandboxListPresenter { + constructor(private readonly _replica: PrismaClientOrTransaction) {} + + public async call({ + userId, + organizationSlug, + projectSlug, + environmentSlug, + }: { + userId: string; + organizationSlug: string; + projectSlug: string; + environmentSlug: string; + }): Promise<{ + sandboxes: SandboxListItem[]; + }> { + const environment = await this._replica.runtimeEnvironment.findFirstOrThrow({ + where: { + organization: { + slug: organizationSlug, + members: { + some: { + userId, + }, + }, + }, + project: { + slug: projectSlug, + }, + slug: environmentSlug, + }, + include: { + organization: true, + project: true, + }, + }); + + const currentWorker = await findCurrentWorkerFromEnvironment( + { + id: environment.id, + type: environment.type, + }, + this._replica + ); + + if (!currentWorker) { + return { + sandboxes: [], + }; + } + + // Find all sandbox tasks for the current worker + const sandboxTasks = await this._replica.backgroundWorkerTask.findMany({ + where: { + workerId: currentWorker.id, + triggerSource: "SANDBOX", + sandboxEnvironmentId: { + not: null, + }, + }, + select: { + id: true, + slug: true, + filePath: true, + sandboxEnvironmentId: true, + sandboxEnvironment: true, + }, + }); + + // Group tasks by sandbox environment + const sandboxMap = new Map< + string, + SandboxEnvironment & { + taskSlugs: string[]; + tasks: Array<{ slug: string; filePath: string }>; + } + >(); + + for (const task of sandboxTasks) { + if (!task.sandboxEnvironment) continue; + + const existing = sandboxMap.get(task.sandboxEnvironment.id); + if (existing) { + existing.taskSlugs.push(task.slug); + existing.tasks.push({ slug: task.slug, filePath: task.filePath }); + } else { + sandboxMap.set(task.sandboxEnvironment.id, { + ...task.sandboxEnvironment, + taskSlugs: [task.slug], + tasks: [{ slug: task.slug, filePath: task.filePath }], + }); + } + } + + // Convert to list items + const sandboxes: SandboxListItem[] = Array.from(sandboxMap.values()).map((sandbox) => ({ + id: sandbox.id, + friendlyId: sandbox.friendlyId, + deduplicationKey: sandbox.deduplicationKey, + type: sandbox.type, + status: sandbox.status, + runtime: sandbox.runtime, + packages: sandbox.packages, + systemPackages: sandbox.systemPackages, + imageReference: sandbox.imageReference, + imageVersion: sandbox.imageVersion, + contentHash: sandbox.contentHash, + createdAt: sandbox.createdAt, + updatedAt: sandbox.updatedAt, + taskCount: sandbox.taskSlugs.length, + taskSlugs: sandbox.taskSlugs, + tasks: sandbox.tasks, + })); + + // Sort by createdAt descending + sandboxes.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime()); + + return { + sandboxes, + }; + } +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sandboxes/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sandboxes/route.tsx new file mode 100644 index 0000000000..302244d84e --- /dev/null +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.sandboxes/route.tsx @@ -0,0 +1,262 @@ +import { BookOpenIcon } from "@heroicons/react/20/solid"; +import { type MetaFunction, Outlet, useParams } from "@remix-run/react"; +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { typedjson, useTypedLoaderData } from "remix-typedjson"; +import { MainCenteredContainer, PageBody, PageContainer } from "~/components/layout/AppLayout"; +import { RuntimeIcon } from "~/components/RuntimeIcon"; +import { LinkButton } from "~/components/primitives/Buttons"; +import { DateTime } from "~/components/primitives/DateTime"; +import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; +import { Paragraph } from "~/components/primitives/Paragraph"; +import { + ResizableHandle, + ResizablePanel, + ResizablePanelGroup, +} from "~/components/primitives/Resizable"; +import { + Table, + TableBlankRow, + TableBody, + TableCell, + TableHeader, + TableHeaderCell, + TableRow, +} from "~/components/primitives/Table"; +import { SimpleTooltip } from "~/components/primitives/Tooltip"; +import { TruncatedCopyableValue } from "~/components/primitives/TruncatedCopyableValue"; +import { + SandboxStatus, + sandboxStatusDescription, + sandboxStatuses, +} from "~/components/runs/v3/SandboxStatus"; +import { $replica } from "~/db.server"; +import { useEnvironment } from "~/hooks/useEnvironment"; +import { useOrganization } from "~/hooks/useOrganizations"; +import { useProject } from "~/hooks/useProject"; +import { SandboxListPresenter } from "~/presenters/v3/SandboxListPresenter.server"; +import { requireUserId } from "~/services/session.server"; +import { EnvironmentParamSchema, docsPath } from "~/utils/pathBuilder"; + +export const meta: MetaFunction = () => { + return [ + { + title: `Sandboxes | Trigger.dev`, + }, + ]; +}; + +export const loader = async ({ request, params }: LoaderFunctionArgs) => { + const userId = await requireUserId(request); + const { organizationSlug, projectParam, envParam } = EnvironmentParamSchema.parse(params); + + const presenter = new SandboxListPresenter($replica); + + try { + const result = await presenter.call({ + userId, + organizationSlug, + projectSlug: projectParam, + environmentSlug: envParam, + }); + + return typedjson(result); + } catch (error) { + console.error(error); + throw new Response(undefined, { + status: 400, + statusText: "Something went wrong, if this problem persists please contact support.", + }); + } +}; + +export default function Page() { + const organization = useOrganization(); + const project = useProject(); + const environment = useEnvironment(); + const { sandboxes } = useTypedLoaderData(); + const hasSandboxes = sandboxes.length > 0; + + const { sandboxParam } = useParams(); + + return ( + + + + + + Sandboxes docs + + + + + + + {hasSandboxes ? ( + + + + + ID + Task + + {sandboxStatuses.map((status) => ( + + + + + + {sandboxStatusDescription(status)} + + + ))} + + } + > + Status + + Runtime + Packages + System Packages + Created + + + + {sandboxes.map((sandbox) => { + const isSelected = sandboxParam === sandbox.friendlyId; + return ( + + + + + + {sandbox.tasks.length === 1 ? ( + + {sandbox.tasks[0].slug} + + } + disableHoverableContent + /> + ) : ( + + {sandbox.tasks.map((task) => ( + + {task.slug} + + {task.filePath} + + + ))} + + } + button={ + + {sandbox.taskCount} tasks + + } + /> + )} + + + + + + + + + + + + + + + + + + ); + })} + + + + ) : ( + + + No sandboxes found for this environment. + + + )} + + + {sandboxParam && ( + <> + + + + + > + )} + + + + ); +} + +function SandboxRuntime({ runtime }: { runtime: string }) { + // Parse runtime string in format "node:22" or "bun:1.3.0" + const parts = runtime.split(":"); + const runtimeName = parts[0]; + const runtimeVersion = parts[1]; + + return ; +} + +function PackagesList({ packages }: { packages: string[] }) { + if (packages.length === 0) { + return –; + } + + // Show up to 2 packages inline + if (packages.length <= 2) { + return ( + + {packages.join(", ")} + + ); + } + + // Show first 2 packages and "X more" with tooltip + const displayPackages = packages.slice(0, 2); + const remainingCount = packages.length - 2; + + return ( + + {packages.map((pkg) => ( + + {pkg} + + ))} + + } + button={ + + {displayPackages.join(", ")} + , and {remainingCount} more + + } + /> + ); +} diff --git a/apps/webapp/app/runEngine/concerns/environmentVariablesProcessor.server.ts b/apps/webapp/app/runEngine/concerns/environmentVariablesProcessor.server.ts new file mode 100644 index 0000000000..172535a2a1 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/environmentVariablesProcessor.server.ts @@ -0,0 +1,53 @@ +import { TaskRunEnvironmentVariablesConfig } from "@trigger.dev/core/v3/schemas"; +import { EnvironmentVariablesProcessor, TriggerTaskRequest } from "../types"; +import pMap from "p-map"; +import { EncryptedSecretValue, encryptSecret } from "~/services/secrets/secretStore.server"; + +export class DefaultEnvironmentVariablesProcessor implements EnvironmentVariablesProcessor { + constructor(private readonly encryptionKey: string) {} + + async process( + request: TriggerTaskRequest + ): Promise { + if (!request.body.options?.env) { + return undefined; + } + + const { variables, whitelist, blacklist } = request.body.options.env; + + if (!variables && !whitelist && !blacklist) { + return undefined; + } + + const encryptedVariables = await pMap( + Object.entries(request.body.options.env.variables ?? {}), + async ([key, value]) => { + return await createSecretVariable(this.encryptionKey, key, value); + }, + { concurrency: 10 } + ); + + return { + variables: encryptedVariables.reduce((acc, curr) => { + acc[curr.key] = { + encryptor: curr.encryptor, + value: curr.value, + }; + return acc; + }, {} as Record), + whitelist, + blacklist, + override: true, + }; + } +} + +async function createSecretVariable(encryptionKey: string, key: string, value: string) { + const encryptedValue = await encryptSecret(encryptionKey, value); + + return { + key, + encryptor: "platform", + value: encryptedValue, + }; +} diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 144d9b3178..56375d554d 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -32,6 +32,7 @@ import type { import { clampMaxDuration } from "../../v3/utils/maxDuration"; import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server"; import type { + EnvironmentVariablesProcessor, PayloadProcessor, QueueManager, RunNumberIncrementer, @@ -61,6 +62,7 @@ export class RunEngineTriggerTaskService { private readonly traceEventConcern: TraceEventConcern; private readonly triggerRacepointSystem: TriggerRacepointSystem; private readonly metadataMaximumSize: number; + private readonly environmentVariablesProcessor: EnvironmentVariablesProcessor; constructor(opts: { prisma: PrismaClientOrTransaction; @@ -73,6 +75,7 @@ export class RunEngineTriggerTaskService { traceEventConcern: TraceEventConcern; tracer: Tracer; metadataMaximumSize: number; + environmentVariablesProcessor: EnvironmentVariablesProcessor; triggerRacepointSystem?: TriggerRacepointSystem; }) { this.prisma = opts.prisma; @@ -85,6 +88,7 @@ export class RunEngineTriggerTaskService { this.tracer = opts.tracer; this.traceEventConcern = opts.traceEventConcern; this.metadataMaximumSize = opts.metadataMaximumSize; + this.environmentVariablesProcessor = opts.environmentVariablesProcessor; this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem(); } @@ -266,6 +270,10 @@ export class RunEngineTriggerTaskService { const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); + const environmentVariablesConfig = await this.environmentVariablesProcessor.process( + triggerRequest + ); + try { return await this.traceEventConcern.traceRun( triggerRequest, @@ -347,6 +355,7 @@ export class RunEngineTriggerTaskService { createdAt: options.overrideCreatedAt, bulkActionId: body.options?.bulkActionId, planType, + environmentVariablesConfig, }, this.prisma ); diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 0aa52d0a40..8557fa8e49 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -1,5 +1,10 @@ import type { BackgroundWorker, TaskRun } from "@trigger.dev/database"; -import type { IOPacket, TaskRunError, TriggerTaskRequestBody } from "@trigger.dev/core/v3"; +import type { + IOPacket, + TaskRunEnvironmentVariablesConfig, + TaskRunError, + TriggerTaskRequestBody, +} from "@trigger.dev/core/v3"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import type { ReportUsagePlan } from "@trigger.dev/platform"; @@ -164,3 +169,7 @@ export type TriggerRacepoints = "idempotencyKey"; export interface TriggerRacepointSystem { waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise; } + +export interface EnvironmentVariablesProcessor { + process(request: TriggerTaskRequest): Promise; +} diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts index 75c6c56447..7282104cbb 100644 --- a/apps/webapp/app/utils/pathBuilder.ts +++ b/apps/webapp/app/utils/pathBuilder.ts @@ -157,6 +157,14 @@ export function v3EnvironmentPath( )}/env/${environmentParam(environment)}`; } +export function sandboxesPath( + organization: OrgForPath, + project: ProjectForPath, + environment: EnvironmentForPath +) { + return `${v3EnvironmentPath(organization, project, environment)}/sandboxes`; +} + export function v3TasksStreamingPath( organization: OrgForPath, project: ProjectForPath, diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index ea43bbe425..b84a153fed 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -4,9 +4,17 @@ import { CreateBackgroundWorkerRequestBody, QueueManifest, TaskResource, + type SandboxMetadata, } from "@trigger.dev/core/v3"; -import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; -import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database"; +import { BackgroundWorkerId, SandboxEnvironmentId } from "@trigger.dev/core/v3/isomorphic"; +import type { + BackgroundWorker, + BackgroundWorkerTask, + SandboxEnvironment, + TaskQueue, + TaskQueueType, + TaskTriggerSource, +} from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { Prisma, PrismaClientOrTransaction } from "~/db.server"; import { sanitizeQueueName } from "~/models/taskQueue.server"; @@ -26,6 +34,8 @@ import { projectPubSub } from "./projectPubSub.server"; import { tryCatch } from "@trigger.dev/core/v3"; import { engine } from "../runEngine.server"; import { scheduleEngine } from "../scheduleEngine.server"; +import pMap from "p-map"; +import { createHash } from "node:crypto"; export class CreateBackgroundWorkerService extends BaseService { public async call( @@ -120,7 +130,7 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error creating background worker files"); } - const [resourcesError] = await tryCatch( + const [resourcesError, workerResources] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -157,6 +167,25 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error syncing declarative schedules"); } + const [sandboxEnvironmentsError] = await tryCatch( + syncSandboxEnvironments( + body.metadata.tasks, + workerResources.tasks, + backgroundWorker, + environment, + this._prisma + ) + ); + + if (sandboxEnvironmentsError) { + logger.error("Error syncing sandbox environments", { + error: sandboxEnvironmentsError, + backgroundWorker, + environment, + }); + throw new ServiceValidationError("Error syncing sandbox environments"); + } + const [updateConcurrencyLimitsError] = await tryCatch( updateEnvConcurrencyLimits(environment) ); @@ -215,7 +244,16 @@ export async function createWorkerResources( const queues = await createWorkerQueues(metadata, worker, environment, prisma); // Create the tasks - await createWorkerTasks(metadata, queues, worker, environment, prisma, tasksToBackgroundFiles); + const tasks = await createWorkerTasks( + metadata, + queues, + worker, + environment, + prisma, + tasksToBackgroundFiles + ); + + return { queues, tasks: tasks.filter(Boolean) }; } async function createWorkerTasks( @@ -226,16 +264,12 @@ async function createWorkerTasks( prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map ) { - // Create tasks in chunks of 20 - const CHUNK_SIZE = 20; - for (let i = 0; i < metadata.tasks.length; i += CHUNK_SIZE) { - const chunk = metadata.tasks.slice(i, i + CHUNK_SIZE); - await Promise.all( - chunk.map((task) => - createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles) - ) - ); - } + // Use pMap with concurrency 20 to process all tasks + return pMap( + metadata.tasks, + (task) => createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles), + { concurrency: 20 } + ); } async function createWorkerTask( @@ -264,7 +298,7 @@ async function createWorkerTask( ); } - await prisma.backgroundWorkerTask.create({ + return await prisma.backgroundWorkerTask.create({ data: { friendlyId: generateFriendlyId("task"), projectId: worker.projectId, @@ -277,7 +311,7 @@ async function createWorkerTask( retryConfig: task.retry, queueConfig: task.queue, machineConfig: task.machine, - triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD", + triggerSource: backgroundWorkerTaskSource(task), fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, queueId: queue.id, @@ -292,6 +326,13 @@ async function createWorkerTask( task, worker, }); + + return await prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: worker.id, + slug: task.id, + }, + }); } else { logger.error("Prisma Error creating background worker task", { error: { @@ -322,6 +363,16 @@ async function createWorkerTask( } } +function backgroundWorkerTaskSource(task: TaskResource): TaskTriggerSource { + if (task.triggerSource === "sandbox") { + return "SANDBOX"; + } else if (task.triggerSource === "schedule") { + return "SCHEDULED"; + } else { + return "STANDARD"; + } +} + async function createWorkerQueues( metadata: BackgroundWorkerMetadata, worker: BackgroundWorker, @@ -652,6 +703,138 @@ export async function syncDeclarativeSchedules( } } +export async function syncSandboxEnvironments( + taskResources: TaskResource[], + tasks: BackgroundWorkerTask[], + worker: BackgroundWorker, + environment: AuthenticatedEnvironment, + prisma: PrismaClientOrTransaction +) { + const tasksWithSandboxEnvironments = taskResources.filter((task) => task.sandbox); + + logger.info("Syncing sandbox environments", { + tasksWithSandboxEnvironments, + environment, + }); + + const existingProvisionedSandboxEnvironments = await prisma.sandboxEnvironment.findMany({ + where: { + runtimeEnvironmentId: environment.id, + type: "PROVISIONED", + }, + }); + + // Upsert sandbox environments by the deduplicationKey (which is the task.id) + return await pMap( + tasksWithSandboxEnvironments, + (task) => + upsertSandboxEnvironment( + task, + task.sandbox!, + tasks, + existingProvisionedSandboxEnvironments, + environment, + prisma + ), + { + concurrency: 4, + } + ); +} + +async function upsertSandboxEnvironment( + task: TaskResource, + sandbox: SandboxMetadata, + workerTasks: BackgroundWorkerTask[], + sandboxEnvironments: SandboxEnvironment[], + environment: AuthenticatedEnvironment, + prisma: PrismaClientOrTransaction +) { + const workerTask = workerTasks.find((t) => t.slug === task.id); + + if (!workerTask) { + throw new Error(`Worker task not found for task ${task.id}`); + } + + const contentHash = calculateSandboxContentHash(sandbox); + + const existingSandboxEnvironment = sandboxEnvironments.find( + (sandboxEnvironment) => sandboxEnvironment.deduplicationKey === task.id + ); + + if (existingSandboxEnvironment) { + if (existingSandboxEnvironment.contentHash !== contentHash) { + // If the content hash is different, we need to rebuild the sandbox environment + return await prisma.sandboxEnvironment.update({ + where: { + id: existingSandboxEnvironment.id, + }, + data: { + contentHash, + status: environment.type === "DEVELOPMENT" ? "DEPLOYED" : "PENDING", + packages: sandbox.packages, + systemPackages: sandbox.systemPackages, + runtime: sandbox.runtime, + backgroundWorkerTasks: { + connect: { + id: workerTask.id, + }, + }, + }, + }); + } else { + return await prisma.sandboxEnvironment.update({ + where: { + id: existingSandboxEnvironment.id, + }, + data: { + backgroundWorkerTasks: { + connect: { + id: workerTask.id, + }, + }, + }, + }); + } + } else { + const id = SandboxEnvironmentId.generate(); + + // Okay so we need to create the new sandbox environment + return await prisma.sandboxEnvironment.create({ + data: { + id: id.id, + status: environment.type === "DEVELOPMENT" ? "DEPLOYED" : "PENDING", + friendlyId: id.friendlyId, + deduplicationKey: task.id, + runtimeEnvironmentId: environment.id, + organizationId: environment.organizationId, + projectId: environment.projectId, + runtime: sandbox.runtime, + packages: sandbox.packages, + systemPackages: sandbox.systemPackages, + contentHash, + backgroundWorkerTasks: { + connect: { + id: workerTask.id, + }, + }, + }, + }); + } +} + +function calculateSandboxContentHash(sandbox: SandboxMetadata) { + const { packages, systemPackages, runtime } = sandbox; + const keyMaterial: string[] = []; + + // Add packages, systemPackages, and runtime to the key material, making sure to sort the arrays + keyMaterial.push(...[...packages].sort()); + keyMaterial.push(...[...systemPackages].sort()); + keyMaterial.push(runtime); + + return createHash("sha256").update(keyMaterial.join("")).digest("hex"); +} + export async function createBackgroundFiles( files: Array | undefined, worker: BackgroundWorker, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 235dddd7d6..0c27390180 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -13,6 +13,7 @@ import { determineEngineVersion } from "../engineVersion.server"; import { tracer } from "../tracer.server"; import { WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; +import { DefaultEnvironmentVariablesProcessor } from "~/runEngine/concerns/environmentVariablesProcessor.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -109,6 +110,7 @@ export class TriggerTaskService extends WithRunEngine { traceEventConcern, tracer: tracer, metadataMaximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE, + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor(env.ENCRYPTION_KEY), }); return await service.call({ diff --git a/apps/webapp/package.json b/apps/webapp/package.json index d1b2dacda3..a503d18e06 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -15,7 +15,7 @@ "lint": "eslint --cache --cache-location ./node_modules/.cache/eslint .", "start": "cross-env NODE_ENV=production node --max-old-space-size=8192 ./build/server.js", "start:local": "cross-env node --max-old-space-size=8192 ./build/server.js", - "typecheck": "tsc --noEmit -p ./tsconfig.check.json", + "typecheck": "cross-env NODE_OPTIONS=--max-old-space-size=8192 tsc --noEmit -p ./tsconfig.check.json", "db:seed": "node prisma/seed.js", "db:seed:local": "ts-node prisma/seed.ts", "build:db:populate": "esbuild --platform=node --bundle --minify --format=cjs ./prisma/populate.ts --outdir=prisma", diff --git a/apps/webapp/tailwind.config.js b/apps/webapp/tailwind.config.js index 7ca81fd8ee..b36bf13709 100644 --- a/apps/webapp/tailwind.config.js +++ b/apps/webapp/tailwind.config.js @@ -158,7 +158,9 @@ const tasks = colors.blue[500]; const runs = colors.indigo[500]; const batches = colors.pink[500]; const schedules = colors.yellow[500]; +const sandboxes = colors.amber[400]; const queues = colors.purple[500]; +const waitpointTokens = colors.sky[500]; const deployments = colors.green[500]; const tests = colors.lime[500]; const apiKeys = colors.amber[500]; @@ -234,6 +236,8 @@ module.exports = { runs, batches, schedules, + sandboxes, + waitpointTokens, queues, deployments, tests, diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index aa0e059156..4ef7b2e36f 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -18,7 +18,7 @@ import { RunEngine } from "@internal/run-engine"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "@internal/run-engine/tests"; import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; -import { IOPacket } from "@trigger.dev/core/v3"; +import { IOPacket, TaskRunEnvironmentVariablesConfig } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; @@ -40,6 +40,7 @@ import { import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server"; import { promiseWithResolvers } from "@trigger.dev/core"; import { setTimeout } from "node:timers/promises"; +import { DefaultEnvironmentVariablesProcessor } from "~/runEngine/concerns/environmentVariablesProcessor.server"; vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout @@ -200,6 +201,9 @@ describe("RunEngineTriggerTaskService", () => { traceEventConcern: new MockTraceEventConcern(), tracer: trace.getTracer("test", "0.0.0"), metadataMaximumSize: 1024 * 1024 * 1, // 1MB + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor( + "7d046af878577f9c85295ce92324ec79" + ), }); const result = await triggerTaskService.call({ @@ -224,6 +228,7 @@ describe("RunEngineTriggerTaskService", () => { expect(run?.engine).toBe("V2"); expect(run?.queuedAt).toBeDefined(); expect(run?.queue).toBe(`task/${taskIdentifier}`); + expect(run?.environmentVariablesConfig).toBeNull(); // Lets make sure the task is in the queue const queueLength = await engine.runQueue.lengthOfQueue( @@ -291,6 +296,9 @@ describe("RunEngineTriggerTaskService", () => { traceEventConcern: new MockTraceEventConcern(), tracer: trace.getTracer("test", "0.0.0"), metadataMaximumSize: 1024 * 1024 * 1, // 1MB + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor( + "7d046af878577f9c85295ce92324ec79" + ), }); const result = await triggerTaskService.call({ @@ -320,6 +328,7 @@ describe("RunEngineTriggerTaskService", () => { expect(run?.engine).toBe("V2"); expect(run?.queuedAt).toBeDefined(); expect(run?.queue).toBe(`task/${taskIdentifier}`); + expect(run?.environmentVariablesConfig).toBeNull(); // Lets make sure the task is in the queue const queueLength = await engine.runQueue.lengthOfQueue( @@ -472,6 +481,9 @@ describe("RunEngineTriggerTaskService", () => { tracer: trace.getTracer("test", "0.0.0"), metadataMaximumSize: 1024 * 1024 * 1, // 1MB triggerRacepointSystem, + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor( + "7d046af878577f9c85295ce92324ec79" + ), }); const idempotencyKey = "test-idempotency-key"; @@ -655,6 +667,9 @@ describe("RunEngineTriggerTaskService", () => { traceEventConcern: new MockTraceEventConcern(), tracer: trace.getTracer("test", "0.0.0"), metadataMaximumSize: 1024 * 1024 * 1, // 1MB + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor( + "7d046af878577f9c85295ce92324ec79" + ), }); // Test case 1: Trigger with lockToVersion but no specific queue @@ -731,4 +746,111 @@ describe("RunEngineTriggerTaskService", () => { await engine.quit(); } ); + + containerTest( + "should process environment variables correctly", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const queuesManager = new DefaultQueueManager(prisma, engine); + + const idempotencyKeyConcern = new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + runNumberIncrementer: new MockRunNumberIncrementer(), + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern, + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024 * 1, // 1MB + environmentVariablesProcessor: new DefaultEnvironmentVariablesProcessor( + "7d046af878577f9c85295ce92324ec79" + ), + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + env: { + variables: { + TEST_VARIABLE: "test-value", + }, + whitelist: ["OPENAI_API_KEY"], + }, + }, + }, + }); + + expect(result).toBeDefined(); + expect(result?.run.friendlyId).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.isCached).toBe(false); + + const run = await prisma.taskRun.findUnique({ + where: { + id: result?.run.id, + }, + }); + + expect(run).toBeDefined(); + expect(run?.friendlyId).toBe(result?.run.friendlyId); + expect(run?.engine).toBe("V2"); + expect(run?.environmentVariablesConfig).toBeDefined(); + + const config = TaskRunEnvironmentVariablesConfig.safeParse(run?.environmentVariablesConfig); + expect(config.success).toBe(true); + expect(config.data?.variables).toBeDefined(); + expect(config.data?.variables?.TEST_VARIABLE).toBeDefined(); + expect(config.data?.whitelist).toBeDefined(); + expect(config.data?.whitelist?.length).toBe(1); + expect(config.data?.whitelist?.[0]).toBe("OPENAI_API_KEY"); + + await engine.quit(); + } + ); }); diff --git a/internal-packages/database/prisma/migrations/20251015092752_add_sandbox_environments/migration.sql b/internal-packages/database/prisma/migrations/20251015092752_add_sandbox_environments/migration.sql new file mode 100644 index 0000000000..f4e1719bc8 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251015092752_add_sandbox_environments/migration.sql @@ -0,0 +1,48 @@ +-- CreateEnum +CREATE TYPE "public"."SandboxStatus" AS ENUM ('PENDING', 'DEPLOYING', 'DEPLOYED', 'FAILED', 'CANCELED'); + +-- CreateEnum +CREATE TYPE "public"."SandboxType" AS ENUM ('PROVISIONED', 'PROGRAMMATIC'); + +-- AlterEnum +ALTER TYPE "public"."TaskTriggerSource" ADD VALUE 'SANDBOX'; + +-- AlterTable +ALTER TABLE "public"."BackgroundWorkerTask" ADD COLUMN "sandboxEnvironmentId" TEXT; + +-- CreateTable +CREATE TABLE "public"."SandboxEnvironment" ( + "id" TEXT NOT NULL, + "type" "public"."SandboxType" NOT NULL DEFAULT 'PROVISIONED', + "friendlyId" TEXT NOT NULL, + "deduplicationKey" TEXT NOT NULL, + "packages" TEXT[], + "systemPackages" TEXT[], + "runtime" TEXT NOT NULL, + "imageReference" TEXT, + "imageVersion" TEXT, + "contentHash" TEXT, + "status" "public"."SandboxStatus" NOT NULL DEFAULT 'PENDING', + "organizationId" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "SandboxEnvironment_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "SandboxEnvironment_friendlyId_key" ON "public"."SandboxEnvironment"("friendlyId"); + +-- CreateIndex +CREATE INDEX "SandboxEnvironment_runtimeEnvironmentId_idx" ON "public"."SandboxEnvironment"("runtimeEnvironmentId"); + +-- CreateIndex +CREATE UNIQUE INDEX "SandboxEnvironment_runtimeEnvironmentId_deduplicationKey_key" ON "public"."SandboxEnvironment"("runtimeEnvironmentId", "deduplicationKey"); + +-- AddForeignKey +ALTER TABLE "public"."BackgroundWorkerTask" ADD CONSTRAINT "BackgroundWorkerTask_sandboxEnvironmentId_fkey" FOREIGN KEY ("sandboxEnvironmentId") REFERENCES "public"."SandboxEnvironment"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."SandboxEnvironment" ADD CONSTRAINT "SandboxEnvironment_runtimeEnvironmentId_fkey" FOREIGN KEY ("runtimeEnvironmentId") REFERENCES "public"."RuntimeEnvironment"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/internal-packages/database/prisma/migrations/20251015150016_add_environment_variables_config_to_task_run/migration.sql b/internal-packages/database/prisma/migrations/20251015150016_add_environment_variables_config_to_task_run/migration.sql new file mode 100644 index 0000000000..30d6d4e660 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251015150016_add_environment_variables_config_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskRun" ADD COLUMN "environmentVariablesConfig" JSONB; \ No newline at end of file diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 105dff4bef..a880e08e69 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -329,6 +329,7 @@ model RuntimeEnvironment { workerInstances WorkerInstance[] waitpointTags WaitpointTag[] BulkActionGroup BulkActionGroup[] + sandboxEnvironments SandboxEnvironment[] @@unique([projectId, slug, orgMemberId]) @@unique([projectId, shortcode]) @@ -549,6 +550,9 @@ model BackgroundWorkerTask { triggerSource TaskTriggerSource @default(STANDARD) + sandboxEnvironment SandboxEnvironment? @relation(fields: [sandboxEnvironmentId], references: [id], onDelete: SetNull) + sandboxEnvironmentId String? + payloadSchema Json? @@unique([workerId, slug]) @@ -560,6 +564,7 @@ model BackgroundWorkerTask { enum TaskTriggerSource { STANDARD SCHEDULED + SANDBOX } model TaskRun { @@ -651,12 +656,13 @@ model TaskRun { concurrencyKey String? - delayUntil DateTime? - queuedAt DateTime? - ttl String? - expiredAt DateTime? - maxAttempts Int? - lockedRetryConfig Json? + delayUntil DateTime? + queuedAt DateTime? + ttl String? + expiredAt DateTime? + maxAttempts Int? + lockedRetryConfig Json? + environmentVariablesConfig Json? /// optional token that can be used to authenticate the task run oneTimeUseToken String? @@ -2325,3 +2331,49 @@ model ConnectedGithubRepository { @@unique([projectId]) @@index([repositoryId]) } + +model SandboxEnvironment { + id String @id @default(cuid()) + type SandboxType @default(PROVISIONED) + + friendlyId String @unique + + deduplicationKey String + + packages String[] + systemPackages String[] + runtime String + + imageReference String? + imageVersion String? + contentHash String? // for deduplication + + status SandboxStatus @default(PENDING) + + organizationId String + projectId String + + runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade) + runtimeEnvironmentId String + + backgroundWorkerTasks BackgroundWorkerTask[] + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([runtimeEnvironmentId, deduplicationKey]) + @@index([runtimeEnvironmentId]) +} + +enum SandboxStatus { + PENDING + DEPLOYING + DEPLOYED + FAILED + CANCELED +} + +enum SandboxType { + PROVISIONED + PROGRAMMATIC +} diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index ca8628c952..86b0d071a4 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -389,6 +389,7 @@ export class RunEngine { createdAt, bulkActionId, planType, + environmentVariablesConfig, }: TriggerParams, tx?: PrismaClientOrTransaction ): Promise { @@ -469,6 +470,7 @@ export class RunEngine { createdAt, bulkActionGroupIds: bulkActionId ? [bulkActionId] : undefined, planType, + environmentVariablesConfig, executionSnapshots: { create: { engine: "V2", diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index a884ca9ba6..8703568eaa 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -614,6 +614,8 @@ export class RunAttemptSystem { id: BatchId.toFriendlyId(updatedRun.batchId), } : undefined, + executor: + task.source && task.source === "SANDBOX" ? "SANDBOX_EXECUTOR" : "TASK_RUN_EXECUTOR", }; return { run: updatedRun, snapshot, execution }; @@ -1617,6 +1619,7 @@ export class RunAttemptSystem { slug: true, filePath: true, exportName: true, + triggerSource: true, }, }); @@ -1624,6 +1627,7 @@ export class RunAttemptSystem { id: task.slug, filePath: task.filePath, exportName: task.exportName ?? undefined, + source: task.triggerSource, }; }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 040cb3cd09..5d864739e9 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -5,6 +5,7 @@ import { MachinePreset, MachinePresetName, RetryOptions, + TaskRunEnvironmentVariablesConfig, TriggerTraceContext, } from "@trigger.dev/core/v3"; import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database"; @@ -148,6 +149,7 @@ export type TriggerParams = { createdAt?: Date; bulkActionId?: string; planType?: string; + environmentVariablesConfig?: TaskRunEnvironmentVariablesConfig; }; export type EngineWorker = Worker; diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 9239f2b2bd..18b19a5330 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -57,6 +57,7 @@ import { UsageTimeoutManager, StandardTraceContextManager, StandardHeartbeatsManager, + SandboxTaskExecutor, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -65,7 +66,7 @@ import sourceMapSupport from "source-map-support"; import { env } from "std-env"; import { normalizeImportPath } from "../utilities/normalizeImportPath.js"; import { VERSION } from "../version.js"; -import { promiseWithResolvers } from "@trigger.dev/core/utils"; +import { promiseWithResolvers, tryCatch } from "@trigger.dev/core/utils"; sourceMapSupport.install({ handleUncaughtExceptions: false, @@ -280,6 +281,49 @@ async function bootstrap() { return bootstrapCache; } +async function sandboxBootstrap() { + return await runTimelineMetrics.measureMetric("trigger.dev/start", "bootstrap", {}, async () => { + log("Bootstrapping worker"); + + const tracingSDK = new TracingSDK({ + url: env.TRIGGER_OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + instrumentations: [], + exporters: [], + logExporters: [], + diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", + forceFlushTimeoutMillis: 30_000, + }); + + const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); + const otelLogger: Logger = tracingSDK.getLogger("trigger-dev-worker", VERSION); + + const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger }); + const consoleInterceptor = new ConsoleInterceptor( + otelLogger, + true, + false, + OTEL_LOG_ATTRIBUTE_COUNT_LIMIT + ); + + const otelTaskLogger = new OtelTaskLogger({ + logger: otelLogger, + tracer: tracer, + level: "info", + maxAttributeCount: OTEL_LOG_ATTRIBUTE_COUNT_LIMIT, + }); + + logger.setGlobalTaskLogger(otelTaskLogger); + + log("Bootstrapped worker"); + + return { + tracer, + tracingSDK, + consoleInterceptor, + }; + }); +} + let _execution: TaskRunExecution | undefined; let _isRunning = false; let _isCancelled = false; @@ -330,72 +374,123 @@ const zodIpc = new ZodIpcConnection({ { execution, traceContext, metadata, metrics, env, isWarmStart }, sender ) => { - if (env) { - populateEnv(env, { - override: true, - previousEnv: _lastEnv, - }); + if (execution.executor === "SANDBOX_EXECUTOR") { + if (env) { + populateEnv(env, { + override: true, + previousEnv: _lastEnv, + }); - _lastEnv = env; - } + _lastEnv = env; + } - log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); + log(`[${new Date().toISOString()}] Received SANDBOX_EXECUTOR`, execution); - if (_lastFlushPromise) { - const now = performance.now(); + if (_lastFlushPromise) { + const now = performance.now(); - await _lastFlushPromise; + await _lastFlushPromise; - const duration = performance.now() - now; + const duration = performance.now() - now; - log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`); - } + log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`); - resetExecutionEnvironment(); + resetExecutionEnvironment(); - standardTraceContextManager.traceContext = traceContext; - standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); + standardTraceContextManager.traceContext = traceContext; + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); - if (_isRunning) { - logError("Worker is already running a task"); + if (_isRunning) { + logError("Worker is already running a task"); - await sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ok: false, - id: execution.run.id, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.TASK_ALREADY_RUNNING, - }, - usage: { - durationMs: 0, - }, - flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), - }, - }); + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_ALREADY_RUNNING, + }, + usage: { + durationMs: 0, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), + }, + }); - return; - } + return; + } - const ctx = TaskRunContext.parse(execution); + const [bootstrapError, bootstrapResult] = await tryCatch(sandboxBootstrap()); - taskContext.setGlobalTaskContext({ - ctx, - worker: metadata, - isWarmStart: isWarmStart ?? false, - }); + if (bootstrapError) { + logError("Failed to bootstrap sandbox", bootstrapError); + + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY, + message: bootstrapError.message, + }, + usage: { + durationMs: 0, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), + }, + }); + + return; + } + + const { tracer, tracingSDK, consoleInterceptor } = bootstrapResult; + + runMetadataManager.runId = execution.run.id; + runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined"; + + _executionCount++; + + const executor = new SandboxTaskExecutor({ + tracingSDK, + tracer, + consoleInterceptor, + }); + + return; + } + } else { + if (env) { + populateEnv(env, { + override: true, + previousEnv: _lastEnv, + }); + + _lastEnv = env; + } + + log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); + + if (_lastFlushPromise) { + const now = performance.now(); + + await _lastFlushPromise; - try { - const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } = - await bootstrap(); + const duration = performance.now() - now; - _tracingSDK = tracingSDK; + log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`); + } + + resetExecutionEnvironment(); - const taskManifest = workerManifest.tasks.find((t) => t.id === execution.task.id); + standardTraceContextManager.traceContext = traceContext; + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); - if (!taskManifest) { - logError(`Could not find task ${execution.task.id}`); + if (_isRunning) { + logError("Worker is already running a task"); await sender.send("TASK_RUN_COMPLETED", { execution, @@ -404,8 +499,7 @@ const zodIpc = new ZodIpcConnection({ id: execution.run.id, error: { type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.COULD_NOT_FIND_TASK, - message: `Could not find task ${execution.task.id}. Make sure the task is exported and the ID is correct.`, + code: TaskRunErrorCodes.TASK_ALREADY_RUNNING, }, usage: { durationMs: 0, @@ -417,49 +511,24 @@ const zodIpc = new ZodIpcConnection({ return; } - // First attempt to get the task from the resource catalog - let task = resourceCatalog.getTask(execution.task.id); + const ctx = TaskRunContext.parse(execution); - if (!task) { - log(`Could not find task ${execution.task.id} in resource catalog, importing...`); + taskContext.setGlobalTaskContext({ + ctx, + worker: metadata, + isWarmStart: isWarmStart ?? false, + }); - try { - await runTimelineMetrics.measureMetric( - "trigger.dev/start", - "import", - { - entryPoint: taskManifest.entryPoint, - file: taskManifest.filePath, - }, - async () => { - const beforeImport = performance.now(); - resourceCatalog.setCurrentFileContext( - taskManifest.entryPoint, - taskManifest.filePath - ); - - // Load init file if it exists - if (workerManifest.initEntryPoint) { - try { - await import(normalizeImportPath(workerManifest.initEntryPoint)); - log(`Loaded init file from ${workerManifest.initEntryPoint}`); - } catch (err) { - logError(`Failed to load init file`, err); - throw err; - } - } + try { + const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } = + await bootstrap(); - await import(normalizeImportPath(taskManifest.entryPoint)); - resourceCatalog.clearCurrentFileContext(); - const durationMs = performance.now() - beforeImport; + _tracingSDK = tracingSDK; - log( - `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` - ); - } - ); - } catch (err) { - logError(`Failed to import task ${execution.task.id}`, err); + const taskManifest = workerManifest.tasks.find((t) => t.id === execution.task.id); + + if (!taskManifest) { + logError(`Could not find task ${execution.task.id}`); await sender.send("TASK_RUN_COMPLETED", { execution, @@ -468,9 +537,8 @@ const zodIpc = new ZodIpcConnection({ id: execution.run.id, error: { type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, - message: err instanceof Error ? err.message : String(err), - stackTrace: err instanceof Error ? err.stack : undefined, + code: TaskRunErrorCodes.COULD_NOT_FIND_TASK, + message: `Could not find task ${execution.task.id}. Make sure the task is exported and the ID is correct.`, }, usage: { durationMs: 0, @@ -482,110 +550,176 @@ const zodIpc = new ZodIpcConnection({ return; } - // Now try and get the task again - task = resourceCatalog.getTask(execution.task.id); - } + // First attempt to get the task from the resource catalog + let task = resourceCatalog.getTask(execution.task.id); - if (!task) { - logError(`Could not find task ${execution.task.id}`); + if (!task) { + log(`Could not find task ${execution.task.id} in resource catalog, importing...`); - await sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ok: false, - id: execution.run.id, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.COULD_NOT_FIND_EXECUTOR, - }, - usage: { - durationMs: 0, - }, - flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), - }, - }); + try { + await runTimelineMetrics.measureMetric( + "trigger.dev/start", + "import", + { + entryPoint: taskManifest.entryPoint, + file: taskManifest.filePath, + }, + async () => { + const beforeImport = performance.now(); + resourceCatalog.setCurrentFileContext( + taskManifest.entryPoint, + taskManifest.filePath + ); + + // Load init file if it exists + if (workerManifest.initEntryPoint) { + try { + await import(normalizeImportPath(workerManifest.initEntryPoint)); + log(`Loaded init file from ${workerManifest.initEntryPoint}`); + } catch (err) { + logError(`Failed to load init file`, err); + throw err; + } + } - return; - } + await import(normalizeImportPath(taskManifest.entryPoint)); + resourceCatalog.clearCurrentFileContext(); + const durationMs = performance.now() - beforeImport; - runMetadataManager.runId = execution.run.id; - runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined"; + log( + `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` + ); + } + ); + } catch (err) { + logError(`Failed to import task ${execution.task.id}`, err); + + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, + message: err instanceof Error ? err.message : String(err), + stackTrace: err instanceof Error ? err.stack : undefined, + }, + usage: { + durationMs: 0, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), + }, + }); - _executionCount++; + return; + } - const executor = new TaskExecutor(task, { - tracer, - tracingSDK, - consoleInterceptor, - retries: config.retries, - isWarmStart, - executionCount: _executionCount, - }); + // Now try and get the task again + task = resourceCatalog.getTask(execution.task.id); + } - try { - _execution = execution; - _isRunning = true; + if (!task) { + logError(`Could not find task ${execution.task.id}`); + + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.COULD_NOT_FIND_EXECUTOR, + }, + usage: { + durationMs: 0, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), + }, + }); - standardHeartbeatsManager.startHeartbeat(attemptKey(execution)); + return; + } - runMetadataManager.startPeriodicFlush( - getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) - ); + runMetadataManager.runId = execution.run.id; + runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined"; - devUsageManager.setInitialState({ - cpuTime: execution.run.durationMs ?? 0, - costInCents: execution.run.costInCents ?? 0, + _executionCount++; + + const executor = new TaskExecutor(task, { + tracer, + tracingSDK, + consoleInterceptor, + retries: config.retries, + isWarmStart, + executionCount: _executionCount, }); - _executionMeasurement = usage.start(); + try { + _execution = execution; + _isRunning = true; - const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration); + standardHeartbeatsManager.startHeartbeat(attemptKey(execution)); - const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]); + runMetadataManager.startPeriodicFlush( + getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) + ); - const { result } = await executor.execute(execution, ctx, signal); + devUsageManager.setInitialState({ + cpuTime: execution.run.durationMs ?? 0, + costInCents: execution.run.costInCents ?? 0, + }); - if (_isRunning && !_isCancelled) { - const usageSample = usage.stop(_executionMeasurement); + _executionMeasurement = usage.start(); - return sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ...result, - usage: { - durationMs: usageSample.cpuTime, + const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration); + + const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]); + + const { result } = await executor.execute(execution, ctx, signal); + + if (_isRunning && !_isCancelled) { + const usageSample = usage.stop(_executionMeasurement); + + return sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ...result, + usage: { + durationMs: usageSample.cpuTime, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, - flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), - }, - }); + }); + } + } finally { + standardHeartbeatsManager.stopHeartbeat(); + + _execution = undefined; + _isRunning = false; + log(`[${new Date().toISOString()}] Task run completed`); } - } finally { - standardHeartbeatsManager.stopHeartbeat(); + } catch (err) { + logError("Failed to execute task", err); - _execution = undefined; - _isRunning = false; - log(`[${new Date().toISOString()}] Task run completed`); - } - } catch (err) { - logError("Failed to execute task", err); - - await sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ok: false, - id: execution.run.id, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY, - message: err instanceof Error ? err.message : String(err), - stackTrace: err instanceof Error ? err.stack : undefined, - }, - usage: { - durationMs: 0, + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY, + message: err instanceof Error ? err.message : String(err), + stackTrace: err instanceof Error ? err.stack : undefined, + }, + usage: { + durationMs: 0, + }, + flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), }, - flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(), - }, - }); + }); + } } }, CANCEL: async ({ timeoutInMs }) => { diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 90fa31bd57..850f563608 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -91,6 +91,7 @@ export const BackgroundWorkerId = new IdUtil("worker"); export const CheckpointId = new IdUtil("checkpoint"); export const QueueId = new IdUtil("queue"); export const RunId = new IdUtil("run"); +export const SandboxEnvironmentId = new IdUtil("sandbox"); export const SnapshotId = new IdUtil("snapshot"); export const WaitpointId = new IdUtil("waitpoint"); export const BatchId = new IdUtil("batch"); diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index f8e12f62cc..c274bee8f3 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -149,6 +149,18 @@ export const RunTags = z.union([RunTag, RunTag.array()]); export type RunTags = z.infer; +export const TriggerTaskEnvironmentVariables = z.record(z.string()); + +export const TriggerTaskEnvironmentVariablesConfig = z.object({ + whitelist: z.array(z.string()).optional(), + blacklist: z.array(z.string()).optional(), + variables: z.record(z.string()).optional(), +}); + +export type TriggerTaskEnvironmentVariablesConfig = z.infer< + typeof TriggerTaskEnvironmentVariablesConfig +>; + export const TriggerTaskRequestBody = z.object({ payload: z.any(), context: z.any(), @@ -203,6 +215,7 @@ export const TriggerTaskRequestBody = z.object({ priority: z.number().optional(), bulkActionId: z.string().optional(), region: z.string().optional(), + env: TriggerTaskEnvironmentVariablesConfig.optional(), }) .optional(), }); diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index c1eb943fed..d05262d8de 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -257,6 +257,7 @@ export type GitMeta = z.infer; export const TaskRunExecutionTask = z.object({ id: z.string(), filePath: z.string(), + source: z.string().optional(), }); export type TaskRunExecutionTask = z.infer; @@ -341,6 +342,7 @@ export const TaskRunExecution = z.object({ traceContext: z.record(z.unknown()).optional(), }) ), + executor: z.string().default("TASK_RUN_EXECUTOR"), ...StaticTaskRunExecutionShape, }); diff --git a/packages/core/src/v3/schemas/resources.ts b/packages/core/src/v3/schemas/resources.ts index 08764906ed..8fe47dc3d6 100644 --- a/packages/core/src/v3/schemas/resources.ts +++ b/packages/core/src/v3/schemas/resources.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import { QueueManifest, RetryOptions, ScheduleMetadata } from "./schemas.js"; +import { QueueManifest, RetryOptions, SandboxMetadata, ScheduleMetadata } from "./schemas.js"; import { MachineConfig } from "./common.js"; export const TaskResource = z.object({ @@ -12,6 +12,7 @@ export const TaskResource = z.object({ machine: MachineConfig.optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), + sandbox: SandboxMetadata.optional(), maxDuration: z.number().optional(), // JSONSchema type - using z.unknown() for runtime validation to accept JSONSchema7 payloadSchema: z.unknown().optional(), diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 233068c0b7..7251d9ca41 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -180,6 +180,27 @@ export const ScheduleMetadata = z.object({ environments: z.array(EnvironmentType).optional(), }); +export const SandboxMetadata = z.object({ + packages: z.array(z.string()), + systemPackages: z.array(z.string()), + runtime: z.string(), +}); + +export type SandboxMetadata = z.infer; + +export const SandboxPayloadSchema = z.object({ + files: z.array( + z.object({ + path: z.string(), + content: z.string(), + }) + ), + entry: z.string().optional(), + input: z.unknown().optional(), +}); + +export type SandboxPayload = z.infer; + const taskMetadata = { id: z.string(), description: z.string().optional(), @@ -188,6 +209,7 @@ const taskMetadata = { machine: MachineConfig.optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), + sandbox: SandboxMetadata.optional(), maxDuration: z.number().optional(), payloadSchema: z.unknown().optional(), }; @@ -310,3 +332,23 @@ export const TriggerTraceContext = z.object({ }); export type TriggerTraceContext = z.infer; + +export const TaskRunEnvironmentVariablesConfig = z.object({ + override: z.boolean().optional(), + whitelist: z.array(z.string()).optional(), + blacklist: z.array(z.string()).optional(), + variables: z + .record( + z.object({ + encryptor: z.string(), + value: z.object({ + tag: z.string(), + nonce: z.string(), + ciphertext: z.string(), + }), + }) + ) + .optional(), +}); + +export type TaskRunEnvironmentVariablesConfig = z.infer; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 67c80d40b4..822f439a98 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -686,6 +686,52 @@ export type TriggerJwtOptions = { expirationTime?: number | Date | string; }; +type TriggerEnvironmentVariables = Record; + +/** + * The options for defining the environment variables that will be passed to the run. + * + * @example + * ```ts + * { + * whitelist: ["OPENAI_API_KEY"], // Will pass the project's OPENAI_API_KEY environment variable to the run + * variables: { + * "MY_VARIABLE": "my-value", // Will define a new environment variable MY_VARIABLE with the value "my-value" in the run + * }, + * } + * ``` + * + * @example + * ```ts + * { + * "MY_VARIABLE": "my-value", // Will define a new environment variable MY_VARIABLE with the value "my-value" in the run + * } + * ``` + * + */ +export type TriggerEnvironmentVariablesOption = + | TriggerEnvironmentVariables + | { + /** + * Whitelist specific environment variables that will be passed to the run from your project's environment variables. + * + * By default, all project environment variables will NOT be passed to the run. + */ + whitelist?: string[]; + /** + * Blacklist specific environment variables that will not be passed to the run from your project's environment variables. + * + * If this is specified, the `whitelist` will be ignored, and all project environment variables will be passed to except the ones in the blacklist. + */ + blacklist?: string[]; + /** + * Override the environment variables that will be passed to the run from your project's environment variables or define new ones. + * + * These will override project environment variables or define new environment variables that will be passed to the run. + */ + variables?: TriggerEnvironmentVariables; + }; + export type TriggerOptions = { /** * A unique key that can be used to ensure that a task is only triggered once per key. @@ -872,6 +918,16 @@ export type TriggerOptions = { * ``` */ region?: string; + + /** + * The environment variables to pass to the run. + * + * @example + * ```ts + * await myTask.trigger({ foo: "bar" }, { env: { MY_VARIABLE: "my-value" } }); + * ``` + */ + env?: TriggerEnvironmentVariablesOption; }; export type TriggerAndWaitOptions = Omit; diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 83c4cc1d54..c3728d6345 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -30,3 +30,4 @@ export { StandardLocalsManager } from "../locals/manager.js"; export { populateEnv } from "./populateEnv.js"; export { StandardTraceContextManager } from "../traceContext/manager.js"; export { StandardHeartbeatsManager } from "../heartbeats/manager.js"; +export { SandboxTaskExecutor, type SandboxTaskExecutorOptions } from "./sandboxTaskExecutor.js"; diff --git a/packages/core/src/v3/workers/sandboxTaskExecutor.ts b/packages/core/src/v3/workers/sandboxTaskExecutor.ts new file mode 100644 index 0000000000..0ad0c23798 --- /dev/null +++ b/packages/core/src/v3/workers/sandboxTaskExecutor.ts @@ -0,0 +1,228 @@ +import { context, SpanKind } from "@opentelemetry/api"; +import { promiseWithResolvers } from "../../utils.js"; +import { ConsoleInterceptor } from "../consoleInterceptor.js"; +import { parseError, sanitizeError } from "../errors.js"; +import { attemptKey, runMetadata, traceContext } from "../index.js"; +import { recordSpanException, TracingSDK } from "../otel/index.js"; +import { runTimelineMetrics } from "../run-timeline-metrics-api.js"; +import { + COLD_VARIANT, + TaskRunErrorCodes, + TaskRunExecution, + TaskRunExecutionResult, +} from "../schemas/index.js"; +import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; +import { TriggerTracer } from "../tracer.js"; +import { tryCatch } from "../tryCatch.js"; +import { + conditionallyExportPacket, + conditionallyImportPacket, + createPacketAttributes, + parsePacket, + stringifyIO, +} from "../utils/ioSerialization.js"; + +export type SandboxTaskExecutorOptions = { + tracingSDK: TracingSDK; + tracer: TriggerTracer; + consoleInterceptor: ConsoleInterceptor; +}; + +export class SandboxTaskExecutor { + private _tracingSDK: TracingSDK; + private _tracer: TriggerTracer; + private _consoleInterceptor: ConsoleInterceptor; + + constructor(private readonly options: SandboxTaskExecutorOptions) { + this._tracingSDK = options.tracingSDK; + this._tracer = options.tracer; + this._consoleInterceptor = options.consoleInterceptor; + } + + async execute( + execution: TaskRunExecution, + signal: AbortSignal + ): Promise<{ result: TaskRunExecutionResult }> { + const attemptMessage = `Attempt ${execution.attempt.number}`; + + const originalPacket = { + data: execution.run.payload, + dataType: execution.run.payloadType, + }; + + if (execution.run.metadata) { + runMetadata.enterWithMetadata(execution.run.metadata); + } + + const result = await this._tracer.startActiveSpan( + attemptMessage, + async (span) => { + const attemptContext = context.active(); + + return await this._consoleInterceptor.intercept(console, async () => { + let parsedPayload: any; + let initOutput: any; + + const [inputError, payloadResult] = await tryCatch( + runTimelineMetrics.measureMetric("trigger.dev/execution", "payload", async () => { + const payloadPacket = await conditionallyImportPacket(originalPacket, this._tracer); + return await parsePacket(payloadPacket); + }) + ); + + if (inputError) { + recordSpanException(span, inputError); + return this.#internalErrorResult( + execution, + TaskRunErrorCodes.TASK_INPUT_ERROR, + inputError + ); + } + + const [parsePayloadError, parsedPayloadResult] = await tryCatch( + this.#parsePayload(payloadResult) + ); + + if (parsePayloadError) { + recordSpanException(span, parsePayloadError); + return this.#internalErrorResult( + execution, + TaskRunErrorCodes.TASK_INPUT_ERROR, + parsePayloadError, + true + ); + } + + parsedPayload = parsedPayloadResult; + + const { + promise: runPromise, + resolve: runResolve, + reject: runReject, + } = promiseWithResolvers(); + + // Make sure the run promise does not cause unhandled promise rejections + runPromise.catch(() => {}); + + const executeTask = async (payload: any) => { + const [runError, output] = await tryCatch( + (async () => { + return {}; + })() + ); + + if (runError) { + runReject(runError); + + return { + id: execution.run.id, + ok: false, + error: sanitizeError(parseError(runError)), + } satisfies TaskRunExecutionResult; + } + + runResolve(output as any); + + const [outputError, stringifiedOutput] = await tryCatch(stringifyIO(output)); + + if (outputError) { + recordSpanException(span, outputError); + + return this.#internalErrorResult( + execution, + TaskRunErrorCodes.TASK_OUTPUT_ERROR, + outputError + ); + } + + const [exportError, finalOutput] = await tryCatch( + conditionallyExportPacket( + stringifiedOutput, + `${attemptKey(execution)}/output`, + this._tracer + ) + ); + + if (exportError) { + recordSpanException(span, exportError); + + return this.#internalErrorResult( + execution, + TaskRunErrorCodes.TASK_OUTPUT_ERROR, + exportError + ); + } + + const [attrError, attributes] = await tryCatch( + createPacketAttributes( + finalOutput, + SemanticInternalAttributes.OUTPUT, + SemanticInternalAttributes.OUTPUT_TYPE + ) + ); + + if (!attrError && attributes) { + span.setAttributes(attributes); + } + + return { + ok: true, + id: execution.run.id, + output: finalOutput.data, + outputType: finalOutput.dataType, + } satisfies TaskRunExecutionResult; + }; + + return await executeTask(parsedPayload); + }); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "attempt", + [SemanticInternalAttributes.ENTITY_TYPE]: "attempt", + [SemanticInternalAttributes.SPAN_ATTEMPT]: true, + ...(execution.attempt.number === 1 + ? runTimelineMetrics.convertMetricsToSpanAttributes() + : {}), + [SemanticInternalAttributes.STYLE_VARIANT]: COLD_VARIANT, + }, + events: + execution.attempt.number === 1 + ? runTimelineMetrics.convertMetricsToSpanEvents() + : undefined, + }, + traceContext.extractContext() + ); + + return { result }; + } + + async #parsePayload(payload: unknown) { + return payload; + } + + #internalErrorResult( + execution: TaskRunExecution, + code: TaskRunErrorCodes, + error: unknown, + skippedRetrying?: boolean + ) { + return { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code, + message: + error instanceof Error + ? `${error.name}: ${error.message}` + : typeof error === "string" + ? error + : undefined, + stackTrace: error instanceof Error ? error.stack : undefined, + }, + skippedRetrying, + } satisfies TaskRunExecutionResult; + } +} diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index 77448ae432..2b44ace503 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -16,6 +16,7 @@ export * from "./locals.js"; export * from "./otel.js"; export * from "./schemas.js"; export * from "./heartbeats.js"; +export * from "./sandbox.js"; export type { Context }; import type { Context } from "./shared.js"; diff --git a/packages/trigger-sdk/src/v3/sandbox.ts b/packages/trigger-sdk/src/v3/sandbox.ts new file mode 100644 index 0000000000..9889bfb0d7 --- /dev/null +++ b/packages/trigger-sdk/src/v3/sandbox.ts @@ -0,0 +1,83 @@ +import { + resourceCatalog, + RunHandle, + type SandboxPayload, + SandboxPayloadSchema, + TaskRunPromise, + TriggerAndWaitOptions, +} from "@trigger.dev/core/v3"; +import { Task, TaskOptions, createSchemaTask } from "./shared.js"; + +/** + * The options for defining a sandbox task. + */ +export type SandboxOptions = Pick< + TaskOptions, + "id" | "description" | "machine" | "retry" | "queue" | "maxDuration" +> & { + /** + * The npm/jsr packages to install in the sandbox. + */ + packages: string[]; + /** + * The system packages to install in the sandbox. + */ + systemPackages: string[]; + /** + * The runtime to use for the sandbox. + */ + runtime: "node:22" | "node:24" | "bun:1.3.0"; +}; + +export type RunCodeOptions = TriggerAndWaitOptions; + +export type SandboxTask = Task & { + runCodeAndWait: ( + payload: SandboxPayload, + options?: RunCodeOptions + ) => TaskRunPromise; + runCode: ( + payload: SandboxPayload, + options?: RunCodeOptions + ) => Promise>; +}; + +function defineSandbox( + params: SandboxOptions +): SandboxTask { + const task = createSchemaTask({ + ...params, + schema: SandboxPayloadSchema, + run: async (payload) => { + return {} as any; + }, + }); + + resourceCatalog.updateTaskMetadata(task.id, { + triggerSource: "sandbox", + sandbox: { + packages: params.packages, + systemPackages: params.systemPackages, + runtime: params.runtime, + }, + }); + + return { + ...task, + runCodeAndWait: (payload, options) => { + return task.triggerAndWait(payload, options) as TaskRunPromise; + }, + runCode: async (payload, options) => { + return (await task.trigger(payload, options)) as RunHandle; + }, + }; +} + +export const sandbox = { + /** + * Define a sandbox task. + * @param params - The parameters for the sandbox task. + * @returns The sandbox task. + */ + define: defineSandbox, +}; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 11b92c2f43..c3f8288a03 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -90,6 +90,8 @@ import type { TriggerAndWaitOptions, TriggerApiRequestOptions, TriggerOptions, + TriggerEnvironmentVariablesOption, + TriggerTaskEnvironmentVariablesConfig, } from "@trigger.dev/core/v3"; export type { @@ -1179,6 +1181,7 @@ async function trigger_internal( priority: options?.priority, region: options?.region, lockToVersion: options?.version ?? getEnvVar("TRIGGER_VERSION"), + env: envOptionToEnvironmentVariablesConfig(options?.env), }, }, { @@ -1336,6 +1339,7 @@ async function triggerAndWait_internal, + }; + } + + return; +} diff --git a/references/hello-world/src/trigger/sandboxes.ts b/references/hello-world/src/trigger/sandboxes.ts new file mode 100644 index 0000000000..f331a72a71 --- /dev/null +++ b/references/hello-world/src/trigger/sandboxes.ts @@ -0,0 +1,90 @@ +import { sandbox, task } from "@trigger.dev/sdk"; + +export const helloWorldSandbox = sandbox.define({ + id: "hello-world-sandbox-2", + packages: ["sharp"], + systemPackages: [], + runtime: "node:22", +}); + +export const helloWorldSandboxTask = task({ + id: "hello-world-sandbox-task", + run: async (payload: any, { ctx }) => { + // Dynamically generate a simple image using sharp and return it as a base64 string + const result = await helloWorldSandbox.runCodeAndWait( + { + entry: "index.ts#main", + input: { + width: 32, + height: 32, + colors: [ + [255, 0, 0], // Red + [0, 255, 0], // Green + [0, 0, 255], // Blue + [255, 255, 0], // Yellow + [255, 255, 255], // White + ], + }, + files: [ + { + path: "index.ts", + content: ` + import sharp from 'sharp'; + + type RGB = [number, number, number]; + + export async function main({ width = 32, height = 32, colors }: { width: number, height: number, colors?: RGB[] }) { + console.log("MY_VARIABLE", process.env.MY_VARIABLE); + console.log(JSON.stringify({ message: "env vars", env: process.env })); + + const defaultColors: RGB[] = [ + [255, 0, 0], // Red + [0, 255, 0], // Green + [0, 0, 255], // Blue + [255, 255, 0], // Yellow + [255, 255, 255] // White + ]; + + const selectedColors = colors || defaultColors; + const channels = 3; // RGB + const buffer = Buffer.alloc(width * height * channels); + + // Generate random pixel data + for (let i = 0; i < buffer.length; i += channels) { + const color = selectedColors[Math.floor(Math.random() * selectedColors.length)]; + buffer[i] = color[0]; // R + buffer[i + 1] = color[1]; // G + buffer[i + 2] = color[2]; // B + } + + // Convert to PNG and get base64 + const pngBuffer = await sharp(buffer, { + raw: { + width: width, + height: height, + channels: 3 + } + }) + .png() + .toBuffer(); + + return pngBuffer.toString('base64'); + } + `, + }, + ], + }, + { + env: { + MY_VARIABLE: "my-value", + }, + } + ); + + if (result.ok) { + console.log(result.output); + } + + return result; + }, +});