diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0fdd95460..72d8fba6a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -83,6 +83,11 @@ jobs: gradle-home-cache-excludes: | caches/modules-2/files-2.1/com.jetbrains.intellij.idea + - name: Set env + run: | + echo "SPP_APPLICATION_GIT_COMMIT=${GITHUB_SHA}" >> $GITHUB_ENV + echo "SPP_APPLICATION_ENVIRONMENT=ci" >> $GITHUB_ENV + - name: Start spp-platform id: start_platform run: ./gradlew :platform:assembleUp @@ -169,6 +174,11 @@ jobs: gradle-home-cache-excludes: | caches/modules-2/files-2.1/com.jetbrains.intellij.idea + - name: Set env + run: | + echo "SPP_APPLICATION_GIT_COMMIT=${GITHUB_SHA}" >> $GITHUB_ENV + echo "SPP_APPLICATION_ENVIRONMENT=ci" >> $GITHUB_ENV + - name: Start spp-platform id: start_platform run: ./gradlew :platform:assembleUp @@ -245,6 +255,11 @@ jobs: echo " environment:" >> docker-compose.override.yml echo " - SPP_CLIENT_ACCESS_ENABLED=false" >> docker-compose.override.yml + - name: Set env + run: | + echo "SPP_APPLICATION_GIT_COMMIT=${GITHUB_SHA}" >> $GITHUB_ENV + echo "SPP_APPLICATION_ENVIRONMENT=ci" >> $GITHUB_ENV + - name: Start spp-platform id: start_platform run: ./gradlew :platform:assembleUp @@ -307,6 +322,11 @@ jobs: gradle-home-cache-excludes: | caches/modules-2/files-2.1/com.jetbrains.intellij.idea + - name: Set env + run: | + echo "SPP_APPLICATION_GIT_COMMIT=${GITHUB_SHA}" >> $GITHUB_ENV + echo "SPP_APPLICATION_ENVIRONMENT=ci" >> $GITHUB_ENV + - name: Start spp-platform id: start_platform run: ./gradlew :platform:assembleUp diff --git a/demos/kotlin b/demos/kotlin index 48af29a48..bd5832bae 160000 --- a/demos/kotlin +++ b/demos/kotlin @@ -1 +1 @@ -Subproject commit 48af29a48e0b70d599973f130f53841c5a67259a +Subproject commit bd5832bae51906c6988eaac179981cdee05f9483 diff --git a/docker/e2e/.env b/docker/e2e/.env index 0c21570fb..ef2b49604 100644 --- a/docker/e2e/.env +++ b/docker/e2e/.env @@ -7,4 +7,6 @@ SPP_PROBE_CLIENT_SECRET=test-secret SPP_PROBE_TENANT_ID=test-tenant SPP_PROBE_PLATFORM_CERTIFICATE= SPP_STORAGE=redis -SW_STORAGE=h2 \ No newline at end of file +SW_STORAGE=h2 +SPP_APPLICATION_GIT_COMMIT= +SPP_APPLICATION_ENVIRONMENT=local \ No newline at end of file diff --git a/docker/e2e/Dockerfile-spp-platform b/docker/e2e/Dockerfile-spp-platform index f6226195a..b529ea14b 100644 --- a/docker/e2e/Dockerfile-spp-platform +++ b/docker/e2e/Dockerfile-spp-platform @@ -20,8 +20,23 @@ RUN printf "\nspp-live-instrument:\n selector: \${SPP_LIVE_INSTRUMENT:default}\ RUN printf "\nexporter:\n selector: \${SPP_LIVE_VIEW:default}\n default:\n" \ >> /skywalking/config/application.yml -# replace default receiver-meter with spp-live-meter-receiver -RUN sed -i -z 's/receiver-meter:\n selector: \${SW_RECEIVER_METER:default}\n default:/receiver-meter:\n selector: \${SW_RECEIVER_METER:spp-live-meter-receiver}\n spp-live-meter-receiver:/' /skywalking/config/application.yml +# replace default receiver-meter with spp-receiver-meter +RUN sed -i -z 's/receiver-meter:\n selector: \${SW_RECEIVER_METER:default\}\n default:/receiver-meter:\n selector: \${SW_RECEIVER_METER:spp-receiver-meter}\n spp-receiver-meter:/' /skywalking/config/application.yml + +# replace default receiver-trace with spp-receiver-trace +RUN sed -i -z 's/receiver-trace:\n selector: \${SW_RECEIVER_TRACE:default\}\n default:/receiver-trace:\n selector: \${SW_RECEIVER_TRACE:spp-receiver-trace}\n spp-receiver-trace:/' /skywalking/config/application.yml + +# replace default event-analyzer with spp-event-analyzer +RUN sed -i -z 's/event-analyzer:\n selector: \${SW_EVENT_ANALYZER:default}\n default:/event-analyzer:\n selector: \${SW_EVENT_ANALYZER:spp-event-analyzer}\n spp-event-analyzer:/' /skywalking/config/application.yml + +# replace default receiver-jvm with spp-receiver-jvm +RUN sed -i -z 's/receiver-jvm:\n selector: \${SW_RECEIVER_JVM:default\}\n default:/receiver-jvm:\n selector: \${SW_RECEIVER_JVM:spp-receiver-jvm}\n spp-receiver-jvm:/' /skywalking/config/application.yml + +# replace default receiver-log with spp-receiver-log +RUN sed -i -z 's/receiver-log:\n selector: \${SW_RECEIVER_LOG:default\}\n default:/receiver-log:\n selector: \${SW_RECEIVER_LOG:spp-receiver-log}\n spp-receiver-log:/' /skywalking/config/application.yml + +# replace default receiver-register with spp-receiver-register +RUN sed -i -z 's/receiver-register:\n selector: \${SW_RECEIVER_REGISTER:default\}\n default:/receiver-register:\n selector: \${SW_RECEIVER_REGISTER:spp-receiver-register}\n spp-receiver-register:/' /skywalking/config/application.yml ADD ./config/spp-platform.crt /skywalking/config/ ADD ./config/spp-platform.key /skywalking/config/ diff --git a/docker/e2e/docker-compose.yml b/docker/e2e/docker-compose.yml index 3ecce8b23..9e2dfd95a 100644 --- a/docker/e2e/docker-compose.yml +++ b/docker/e2e/docker-compose.yml @@ -24,6 +24,8 @@ services: dockerfile: Dockerfile-spp-platform environment: - JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5106 -javaagent:/tmp/spp-probe/spp-probe.jar + - SPP_APPLICATION_GIT_COMMIT=${SPP_APPLICATION_GIT_COMMIT} + - SPP_APPLICATION_ENVIRONMENT=${SPP_APPLICATION_ENVIRONMENT} - SPP_PROBE_ENABLED=${SPP_PROBE_ENABLED} - SPP_PROBE_WAIT_FOR_PLATFORM=${SPP_PROBE_WAIT_FOR_PLATFORM} - SPP_PROBE_PLATFORM_HOST=${SPP_PROBE_PLATFORM_HOST} diff --git a/docker/e2e/spp-probe-platform.yml b/docker/e2e/spp-probe-platform.yml index e06e80365..c7f21ae04 100644 --- a/docker/e2e/spp-probe-platform.yml +++ b/docker/e2e/spp-probe-platform.yml @@ -1,5 +1,5 @@ spp: - enabled: ${SPP_PROBE_ENABLED:-true} + enabled: ${SPP_PROBE_ENABLED:-false} authentication: client_id: ${SPP_PROBE_CLIENT_ID:-test-id} client_secret: ${SPP_PROBE_CLIENT_SECRET:-test-secret} @@ -10,14 +10,13 @@ spp: delete_probe_directory_on_boot: ${SPP_PROBE_DELETE_PROBE_DIRECTORY_ON_BOOT:-false} wait_for_platform: ${SPP_PROBE_WAIT_FOR_PLATFORM:-false} quiet_mode: ${SPP_PROBE_QUIET_MODE:-false} + application: + environment: ${SPP_APPLICATION_ENVIRONMENT:-} + version: ${SPP_APPLICATION_VERSION:-} + git_commit: ${SPP_APPLICATION_GIT_COMMIT:-} skywalking: logging: level: "DEBUG" agent: service_name: "spp-platform-ci" - -application: - environment: "ci" - version: "1.0.0" - git_commit: "8d9b0d9" diff --git a/interfaces/cli b/interfaces/cli index 854642d3a..30c045ca5 160000 --- a/interfaces/cli +++ b/interfaces/cli @@ -1 +1 @@ -Subproject commit 854642d3a30eb501fe5f785025e30dce4a74fd15 +Subproject commit 30c045ca5ab7b7a9f6ba7f428008bbf161629ec7 diff --git a/interfaces/jetbrains b/interfaces/jetbrains index 30b5d355a..cb2eb20cf 160000 --- a/interfaces/jetbrains +++ b/interfaces/jetbrains @@ -1 +1 @@ -Subproject commit 30b5d355a67358051e65e892a0c6bd8557e0d164 +Subproject commit cb2eb20cf0b415d4e2b0cb7013da7c7890124d21 diff --git a/platform/build.gradle.kts b/platform/build.gradle.kts index 0484bfada..77a152a18 100644 --- a/platform/build.gradle.kts +++ b/platform/build.gradle.kts @@ -79,6 +79,8 @@ subprojects { compileOnly("com.graphql-java:graphql-java:20.2") //tied to SkyWalking OAP version compileOnly("com.google.protobuf:protobuf-java:3.21.8") //tied to SkyWalking OAP version compileOnly("io.grpc:grpc-api:1.49.0") //tied to SkyWalking OAP version + compileOnly("io.grpc:grpc-stub:1.49.0") //tied to SkyWalking OAP version + compileOnly("com.linecorp.armeria:armeria:1.23.1") //tied to SkyWalking OAP version implementation("io.vertx:vertx-auth-jwt:$vertxVersion") implementation("io.vertx:vertx-redis-client:$vertxVersion") implementation("io.vertx:vertx-tcp-eventbus-bridge:$vertxVersion") @@ -228,8 +230,8 @@ dockerCompose { startedServices.set(listOf("redis", "spp-platform")) } - //transfer SPP_PROBE_ env vars to containers - System.getenv().filterKeys { it.startsWith("SPP_PROBE_") } + //transfer SPP_PROBE_/SW_ env vars to containers + System.getenv().filterKeys { it.startsWith("SPP_PROBE_") || it.startsWith("SW_") } .forEach { (key, value) -> environment.put(key, value) } } tasks.getByName("composeBuild") diff --git a/platform/common/src/main/kotlin/spp/platform/common/util/ContextUtil.kt b/platform/common/src/main/kotlin/spp/platform/common/util/ContextUtil.kt index ac0c5f3ac..a6eacce94 100644 --- a/platform/common/src/main/kotlin/spp/platform/common/util/ContextUtil.kt +++ b/platform/common/src/main/kotlin/spp/platform/common/util/ContextUtil.kt @@ -30,6 +30,12 @@ object ContextUtil { @JvmStatic val TENANT_ID = Context.key("spp-platform.tenant-id")!! + @JvmStatic + val COMMIT_ID = Context.key("spp-platform.commit-id")!! + + @JvmStatic + val ENVIRONMENT = Context.key("spp-platform.environment")!! + @JvmStatic fun addToVertx(context: Context?) { if (context == null) return @@ -55,5 +61,19 @@ object ContextUtil { vertxContext.removeLocal("tenant_id") } } + COMMIT_ID.get(context).let { + if (it != null) { + vertxContext.putLocal("commit_id", it) + } else { + vertxContext.put("commit_id", "null") + } + } + ENVIRONMENT.get(context).let { + if (it != null) { + vertxContext.putLocal("environment", it) + } else { + vertxContext.put("environment", "null") + } + } } } diff --git a/platform/core/build.gradle.kts b/platform/core/build.gradle.kts index 94f10854d..1f82a8f46 100644 --- a/platform/core/build.gradle.kts +++ b/platform/core/build.gradle.kts @@ -8,6 +8,7 @@ plugins { val platformGroup: String by project val projectVersion: String by project +val skywalkingVersion: String by project group = platformGroup version = project.properties["platformVersion"] as String? ?: projectVersion @@ -40,6 +41,18 @@ configure { dependencies { compileOnly(project(":platform:storage")) implementation(project(":platform:common")) + compileOnly("org.apache.skywalking:skywalking-meter-receiver-plugin:$skywalkingVersion") { + isTransitive = false + } + compileOnly("org.apache.skywalking:skywalking-jvm-receiver-plugin:$skywalkingVersion") { + isTransitive = false + } + compileOnly("org.apache.skywalking:skywalking-log-recevier-plugin:$skywalkingVersion") { + isTransitive = false + } + compileOnly("org.apache.skywalking:skywalking-management-receiver-plugin:$skywalkingVersion") { + isTransitive = false + } //todo: properly add test dependency testImplementation(project(":platform:common").dependencyProject.extensions.getByType(SourceSetContainer::class).test.get().output) diff --git a/platform/core/src/main/kotlin/spp/platform/core/SourcePlatform.kt b/platform/core/src/main/kotlin/spp/platform/core/SourcePlatform.kt index b4c234ef5..0b5941fa6 100644 --- a/platform/core/src/main/kotlin/spp/platform/core/SourcePlatform.kt +++ b/platform/core/src/main/kotlin/spp/platform/core/SourcePlatform.kt @@ -186,15 +186,16 @@ class SourcePlatform(private val manager: ModuleManager) : CoroutineVerticle() { } //Start services + val serviceProvider = ServiceProvider(jwt, manager) vertx.deployVerticle( - ServiceProvider(jwt, manager), + serviceProvider, DeploymentOptions().setConfig(config.put("SPP_INSTANCE_ID", SPP_INSTANCE_ID)) ).await() //Add SkyWalking interceptors val grpcHandlerRegister = manager.find(SharingServerModule.NAME) .provider().getService(GRPCHandlerRegister::class.java) - grpcHandlerRegister.addFilter(SkyWalkingGrpcInterceptor(vertx, config)) + grpcHandlerRegister.addFilter(SkyWalkingGrpcInterceptor(vertx, config, serviceProvider.managementService)) vertx.deployVerticle(SkyWalkingGraphqlInterceptor(router), DeploymentOptions().setConfig(config)).await() if (httpSslEnabled) { diff --git a/platform/core/src/main/kotlin/spp/platform/core/interceptors/SkyWalkingGrpcInterceptor.kt b/platform/core/src/main/kotlin/spp/platform/core/interceptors/SkyWalkingGrpcInterceptor.kt index b94ad0c55..ba4389555 100644 --- a/platform/core/src/main/kotlin/spp/platform/core/interceptors/SkyWalkingGrpcInterceptor.kt +++ b/platform/core/src/main/kotlin/spp/platform/core/interceptors/SkyWalkingGrpcInterceptor.kt @@ -21,18 +21,24 @@ import com.google.common.cache.CacheBuilder import io.grpc.* import io.vertx.core.Vertx import io.vertx.core.json.JsonObject +import io.vertx.kotlin.coroutines.await import io.vertx.kotlin.coroutines.dispatcher import kotlinx.coroutines.runBlocking import mu.KotlinLogging import spp.platform.common.util.ContextUtil import spp.platform.storage.SourceStorage +import spp.protocol.platform.status.InstanceConnection +import spp.protocol.service.LiveManagementService import java.util.concurrent.TimeUnit -class SkyWalkingGrpcInterceptor(private val vertx: Vertx, private val config: JsonObject) : ServerInterceptor { +class SkyWalkingGrpcInterceptor( + private val vertx: Vertx, + private val config: JsonObject, + private val managementService: LiveManagementService +) : ServerInterceptor { companion object { private val log = KotlinLogging.logger {} - private val AUTH_HEAD_HEADER_NAME = Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER) } @@ -41,11 +47,20 @@ class SkyWalkingGrpcInterceptor(private val vertx: Vertx, private val config: Js .expireAfterAccess(1, TimeUnit.MINUTES) .build() + /** + * Intercepts gRPC calls and checks for authentication, adds VCS data to the context, and adds the tenant ID to the + * context if it is present in the auth header. + */ override fun interceptCall( call: ServerCall, headers: Metadata?, next: ServerCallHandler ): ServerCall.Listener { + val clients = runBlocking(vertx.dispatcher()) { managementService.getClients().await() } + val clientOb = clients.getJsonArray("probes").firstOrNull() as? JsonObject//todo: find real client + val client = clientOb?.let { InstanceConnection(it) } + val commitId = (client?.meta?.get("application") as? JsonObject)?.getString("git_commit") + val authHeader = headers?.get(AUTH_HEAD_HEADER_NAME) if (authHeader != null && probeAuthCache.getIfPresent(authHeader) != null) { val authParts = authHeader.split(":") @@ -57,6 +72,7 @@ class SkyWalkingGrpcInterceptor(private val vertx: Vertx, private val config: Js .withValue(ContextUtil.CLIENT_ID, clientId) .withValue(ContextUtil.CLIENT_ACCESS, clientSecret) .withValue(ContextUtil.TENANT_ID, tenantId) + .withValue(ContextUtil.COMMIT_ID, commitId) return Contexts.interceptCall(context, call, headers, next) } else { val authEnabled = config.getJsonObject("client-access")?.getString("enabled")?.toBooleanStrictOrNull() @@ -90,6 +106,7 @@ class SkyWalkingGrpcInterceptor(private val vertx: Vertx, private val config: Js .withValue(ContextUtil.CLIENT_ID, clientId) .withValue(ContextUtil.CLIENT_ACCESS, clientSecret) .withValue(ContextUtil.TENANT_ID, tenantId) + .withValue(ContextUtil.COMMIT_ID, commitId) Contexts.interceptCall(context, call, headers, next) } } diff --git a/platform/core/src/main/kotlin/spp/platform/core/service/LiveManagementServiceImpl.kt b/platform/core/src/main/kotlin/spp/platform/core/service/LiveManagementServiceImpl.kt index 9f33ae8ee..a2d2f0a9a 100644 --- a/platform/core/src/main/kotlin/spp/platform/core/service/LiveManagementServiceImpl.kt +++ b/platform/core/src/main/kotlin/spp/platform/core/service/LiveManagementServiceImpl.kt @@ -570,7 +570,6 @@ class LiveManagementServiceImpl( services.forEach { result.add( Service( - id = it.id, name = it.name, group = it.group, shortName = it.shortName, diff --git a/platform/core/src/main/kotlin/spp/platform/core/service/ServiceProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/service/ServiceProvider.kt index b8c274e22..e0f511ba8 100644 --- a/platform/core/src/main/kotlin/spp/platform/core/service/ServiceProvider.kt +++ b/platform/core/src/main/kotlin/spp/platform/core/service/ServiceProvider.kt @@ -51,7 +51,7 @@ class ServiceProvider( private lateinit var discovery: ServiceDiscovery private lateinit var managementServiceRecord: Record - private lateinit var managementService: LiveManagementService + lateinit var managementService: LiveManagementService override suspend fun start() { try { diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/ServiceVCS.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/ServiceVCS.kt new file mode 100644 index 000000000..f42514b69 --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/ServiceVCS.kt @@ -0,0 +1,59 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs + +import com.google.protobuf.Message +import org.apache.skywalking.apm.network.language.agent.v3.MeterData +import spp.platform.common.util.ContextUtil + +object ServiceVCS { + + fun getServiceName(message: Message): String { + if (message.descriptorForType.findFieldByName("service") != null) { + val service = message.getField(message.descriptorForType.findFieldByName("service")).toString() + if (service.isEmpty() && message is MeterData) { + return "" // not all MeterData messages have a service name + } + require(service.isNotEmpty()) { "Message ${message.descriptorForType} does not have a service name" } + + return service + getEnvironment() + getCommitId() + } else if (message.descriptorForType.findFieldByName("source") != null) { + val source = message.getField(message.descriptorForType.findFieldByName("source")) + if (source is Message) { + val service = source.getField(source.descriptorForType.findFieldByName("service")).toString() + require(service.isNotEmpty()) { "Message ${message.descriptorForType} does not have a service name" } + + return service + getEnvironment() + getCommitId() + } + } + + throw IllegalArgumentException("Message " + message.descriptorForType + " does not have a service name") + } + + private fun getEnvironment(): String { + val env = ContextUtil.ENVIRONMENT.get() + if (env.isNullOrEmpty()) return "|null" + return "|$env" + } + + private fun getCommitId(): String { + val commitId = ContextUtil.COMMIT_ID.get() + if (commitId.isNullOrEmpty()) return "|null" + return "|$commitId" + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSEventModuleProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSEventModuleProvider.kt new file mode 100644 index 000000000..a6c332dd0 --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSEventModuleProvider.kt @@ -0,0 +1,63 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import org.apache.skywalking.apm.network.event.v3.Event +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerModule +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerModuleProvider +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerService +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerServiceImpl +import org.apache.skywalking.oap.server.analyzer.event.listener.EventAnalyzerListener +import org.apache.skywalking.oap.server.library.module.ModuleManager +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [EventAnalyzerServiceImpl] to add the VCS service name to the request. + */ +class VCSEventModuleProvider : EventAnalyzerModuleProvider() { + + override fun name(): String = "spp-event-analyzer" + + override fun start() { + super.start() + + val analyzerService = manager.find(EventAnalyzerModule.NAME).provider() + .getService(EventAnalyzerService::class.java) as EventAnalyzerServiceImpl + registerServiceImplementation( + EventAnalyzerService::class.java, + VCSEventAnalyzerService(analyzerService, manager) + ) + } + + private class VCSEventAnalyzerService( + private val delegate: EventAnalyzerServiceImpl, + manager: ModuleManager + ) : EventAnalyzerServiceImpl(manager) { + override fun analyze(event: Event) = delegate.analyze( + event.toBuilder().setSource( + event.source.toBuilder().setService( + ServiceVCS.getServiceName(event) + ).build() + ).build() + ) + + override fun add(factory: EventAnalyzerListener.Factory) = delegate.add(factory) + override fun getEventAnalyzerListenerFactories(): MutableList = + delegate.eventAnalyzerListenerFactories + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSJVMModuleProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSJVMModuleProvider.kt new file mode 100644 index 000000000..162a67aba --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSJVMModuleProvider.kt @@ -0,0 +1,68 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import io.grpc.stub.StreamObserver +import org.apache.skywalking.apm.network.common.v3.Commands +import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricCollection +import org.apache.skywalking.oap.server.core.CoreModule +import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister +import org.apache.skywalking.oap.server.library.module.ModuleManager +import org.apache.skywalking.oap.server.receiver.jvm.provider.JVMModuleProvider +import org.apache.skywalking.oap.server.receiver.jvm.provider.JVMOALDefine +import org.apache.skywalking.oap.server.receiver.jvm.provider.handler.JVMMetricReportServiceHandler +import org.apache.skywalking.oap.server.receiver.jvm.provider.handler.JVMMetricReportServiceHandlerCompat +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [JVMMetricReportServiceHandler] to add the VCS service name to the request. + */ +class VCSJVMModuleProvider : JVMModuleProvider() { + + override fun name(): String = "spp-receiver-jvm" + + override fun start() { + // load official analysis + manager.find(CoreModule.NAME) + .provider() + .getService(OALEngineLoaderService::class.java) + .load(JVMOALDefine.INSTANCE) + + val grpcHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister::class.java) + val jvmMetricReportServiceHandler = + VCSJVMMetricReportServiceHandler(manager) + grpcHandlerRegister.addHandler(jvmMetricReportServiceHandler) + grpcHandlerRegister.addHandler(JVMMetricReportServiceHandlerCompat(jvmMetricReportServiceHandler)) + } + + private class VCSJVMMetricReportServiceHandler( + manager: ModuleManager, + private val delegate: JVMMetricReportServiceHandler = JVMMetricReportServiceHandler(manager) + ) : JVMMetricReportServiceHandler(manager) { + override fun collect(request: JVMMetricCollection, responseObserver: StreamObserver) = + delegate.collect( + request.toBuilder().setService( + ServiceVCS.getServiceName(request) + ).build(), responseObserver + ) + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSLogModuleProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSLogModuleProvider.kt new file mode 100644 index 000000000..7724671ce --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSLogModuleProvider.kt @@ -0,0 +1,71 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import com.linecorp.armeria.common.HttpMethod +import io.grpc.stub.StreamObserver +import org.apache.skywalking.apm.network.common.v3.Commands +import org.apache.skywalking.apm.network.logging.v3.LogData +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister +import org.apache.skywalking.oap.server.library.module.ModuleManager +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule +import org.apache.skywalking.oap.server.recevier.log.provider.LogModuleProvider +import org.apache.skywalking.oap.server.recevier.log.provider.handler.grpc.LogReportServiceGrpcHandler +import org.apache.skywalking.oap.server.recevier.log.provider.handler.rest.LogReportServiceHTTPHandler +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [LogReportServiceGrpcHandler] to add the VCS service name to the request. + */ +class VCSLogModuleProvider : LogModuleProvider() { + + override fun name(): String = "spp-receiver-log" + + override fun start() { + val grpcHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister::class.java) + + grpcHandlerRegister.addHandler(VCSLogReportServiceGrpcHandler(manager)) + + val httpHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(HTTPHandlerRegister::class.java) + httpHandlerRegister.addHandler(LogReportServiceHTTPHandler(manager), listOf(HttpMethod.POST)) + } + + private class VCSLogReportServiceGrpcHandler( + manager: ModuleManager, + private val delegate: LogReportServiceGrpcHandler = LogReportServiceGrpcHandler(manager) + ) : LogReportServiceGrpcHandler(manager) { + override fun collect(responseObserver: StreamObserver): StreamObserver { + val streamObserver = delegate.collect(responseObserver) + return object : StreamObserver { + override fun onNext(value: LogData) = streamObserver.onNext( + value.toBuilder().setService( + ServiceVCS.getServiceName(value) + ).build() + ) + + override fun onError(t: Throwable) = streamObserver.onError(t) + override fun onCompleted() = streamObserver.onCompleted() + } + } + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSMeterReceiverProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSMeterReceiverProvider.kt new file mode 100644 index 000000000..afbb9d208 --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSMeterReceiverProvider.kt @@ -0,0 +1,81 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import io.grpc.stub.StreamObserver +import org.apache.skywalking.apm.network.common.v3.Commands +import org.apache.skywalking.apm.network.language.agent.v3.MeterData +import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection +import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule +import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister +import org.apache.skywalking.oap.server.library.module.ModuleManager +import org.apache.skywalking.oap.server.receiver.meter.provider.MeterReceiverProvider +import org.apache.skywalking.oap.server.receiver.meter.provider.handler.MeterServiceHandler +import org.apache.skywalking.oap.server.receiver.meter.provider.handler.MeterServiceHandlerCompat +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [MeterServiceHandler] to add the VCS service name to the request. + */ +class VCSMeterReceiverProvider : MeterReceiverProvider() { + + override fun name(): String = "spp-receiver-meter" + + override fun start() { + val processService = manager.find(AnalyzerModule.NAME) + .provider() + .getService(IMeterProcessService::class.java) + val grpcHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister::class.java) + val meterServiceHandlerCompat = VCSMeterServiceHandler(manager, processService!!) + grpcHandlerRegister.addHandler(meterServiceHandlerCompat) + grpcHandlerRegister.addHandler(MeterServiceHandlerCompat(meterServiceHandlerCompat)) + } + + private class VCSMeterServiceHandler( + manager: ModuleManager, + processService: IMeterProcessService, + private val delegate: MeterServiceHandler = MeterServiceHandler(manager, processService) + ) : MeterServiceHandler(manager, processService) { + override fun collect(responseObserver: StreamObserver): StreamObserver { + val streamObserver = delegate.collect(responseObserver) + return object : StreamObserver { + override fun onNext(value: MeterData) = streamObserver.onNext( + value.toBuilder().setService( + ServiceVCS.getServiceName(value) + ).build() + ) + + override fun onError(t: Throwable) = streamObserver.onError(t) + override fun onCompleted() = streamObserver.onCompleted() + } + } + + override fun collectBatch(responseObserver: StreamObserver): StreamObserver { + val streamObserver = delegate.collectBatch(responseObserver) + return object : StreamObserver { + override fun onNext(value: MeterDataCollection) = streamObserver.onNext(value) + override fun onError(t: Throwable) = streamObserver.onError(t) + override fun onCompleted() = streamObserver.onCompleted() + } + } + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSRegisterModuleProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSRegisterModuleProvider.kt new file mode 100644 index 000000000..66bed49bf --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSRegisterModuleProvider.kt @@ -0,0 +1,76 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import com.linecorp.armeria.common.HttpMethod +import io.grpc.stub.StreamObserver +import org.apache.skywalking.apm.network.common.v3.Commands +import org.apache.skywalking.apm.network.management.v3.InstancePingPkg +import org.apache.skywalking.apm.network.management.v3.InstanceProperties +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister +import org.apache.skywalking.oap.server.library.module.ModuleManager +import org.apache.skywalking.oap.server.receiver.register.provider.RegisterModuleProvider +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v8.grpc.ManagementServiceGRPCHandler +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v8.grpc.ManagementServiceGrpcHandlerCompat +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v8.rest.ManagementServiceHTTPHandler +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [ManagementServiceGRPCHandler] to add the VCS service name to the request. + */ +class VCSRegisterModuleProvider : RegisterModuleProvider() { + + override fun name(): String = "spp-receiver-register" + + override fun start() { + val grpcHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister::class.java) + val managementServiceHTTPHandler = + VCSManagementServiceGRPCHandler(manager) + grpcHandlerRegister.addHandler(managementServiceHTTPHandler) + grpcHandlerRegister.addHandler(ManagementServiceGrpcHandlerCompat(managementServiceHTTPHandler)) + + val httpHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(HTTPHandlerRegister::class.java) + httpHandlerRegister.addHandler(ManagementServiceHTTPHandler(manager), listOf(HttpMethod.POST)) + } + + private class VCSManagementServiceGRPCHandler( + manager: ModuleManager, + private val delegate: ManagementServiceGRPCHandler = ManagementServiceGRPCHandler(manager) + ) : ManagementServiceGRPCHandler(manager) { + + override fun reportInstanceProperties(request: InstanceProperties, responseObserver: StreamObserver) = + delegate.reportInstanceProperties( + request.toBuilder().setService( + ServiceVCS.getServiceName(request) + ).build(), responseObserver + ) + + override fun keepAlive(request: InstancePingPkg, responseObserver: StreamObserver) = + delegate.keepAlive( + request.toBuilder().setService( + ServiceVCS.getServiceName(request) + ).build(), responseObserver + ) + } +} diff --git a/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSTraceModuleProvider.kt b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSTraceModuleProvider.kt new file mode 100644 index 000000000..411ef77dd --- /dev/null +++ b/platform/core/src/main/kotlin/spp/platform/core/vcs/providers/VCSTraceModuleProvider.kt @@ -0,0 +1,93 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package spp.platform.core.vcs.providers + +import com.linecorp.armeria.common.HttpMethod +import io.grpc.stub.StreamObserver +import org.apache.skywalking.apm.network.common.v3.Commands +import org.apache.skywalking.apm.network.language.agent.v3.SegmentCollection +import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister +import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister +import org.apache.skywalking.oap.server.library.module.ModuleManager +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceModuleProvider +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.SpanAttachedEventReportServiceHandler +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.TraceSegmentReportServiceHandler +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.TraceSegmentReportServiceHandlerCompat +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.rest.TraceSegmentReportHandler +import spp.platform.core.vcs.ServiceVCS + +/** + * Overrides the default [TraceSegmentReportServiceHandler] to add the VCS service name to the request. + */ +class VCSTraceModuleProvider : TraceModuleProvider() { + + override fun name(): String = "spp-receiver-trace" + + override fun start() { + val grpcHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister::class.java) + val httpHandlerRegister = manager.find(SharingServerModule.NAME) + .provider() + .getService(HTTPHandlerRegister::class.java) + + val traceSegmentReportServiceHandler = VCSTraceSegmentReportServiceHandler(manager) + grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler) + grpcHandlerRegister.addHandler(TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler)) + grpcHandlerRegister.addHandler(SpanAttachedEventReportServiceHandler(manager)) + + httpHandlerRegister.addHandler( + TraceSegmentReportHandler(manager), + listOf(HttpMethod.POST) + ) + } + + private class VCSTraceSegmentReportServiceHandler( + moduleManager: ModuleManager, + private val delegate: TraceSegmentReportServiceHandler = TraceSegmentReportServiceHandler(moduleManager) + ) : TraceSegmentReportServiceHandler(moduleManager) { + + override fun collect(responseObserver: StreamObserver): StreamObserver { + val streamObserver = delegate.collect(responseObserver) + return object : StreamObserver { + override fun onNext(value: SegmentObject) = streamObserver.onNext( + value.toBuilder().setService( + ServiceVCS.getServiceName(value) + ).build() + ) + + override fun onError(t: Throwable) = streamObserver.onError(t) + override fun onCompleted() = streamObserver.onCompleted() + } + } + + override fun collectInSync(request: SegmentCollection, responseObserver: StreamObserver) { + delegate.collectInSync(request, object : StreamObserver { + override fun onNext(value: Commands) { + //todo: ContinuousProfilingPolicyQuery.ServiceName + responseObserver.onNext(value) + } + + override fun onError(t: Throwable) = responseObserver.onError(t) + override fun onCompleted() = responseObserver.onCompleted() + }) + } + } +} diff --git a/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine index f935ae2de..60882f3f4 100644 --- a/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine +++ b/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine @@ -1 +1 @@ -spp.platform.core.SourceCoreModule +spp.platform.core.SourceCoreModule \ No newline at end of file diff --git a/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider index 8dc582168..89236e625 100644 --- a/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider +++ b/platform/core/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -1 +1,7 @@ spp.platform.core.SourceCoreProvider +spp.platform.core.vcs.providers.VCSTraceModuleProvider +spp.platform.core.vcs.providers.VCSEventModuleProvider +spp.platform.core.vcs.providers.VCSJVMModuleProvider +spp.platform.core.vcs.providers.VCSLogModuleProvider +spp.platform.core.vcs.providers.VCSRegisterModuleProvider +spp.platform.core.vcs.providers.VCSMeterReceiverProvider \ No newline at end of file diff --git a/platform/core/src/test/kotlin/integration/JWTTest.kt b/platform/core/src/test/kotlin/integration/JWTTest.kt index 69d985de1..d44645f45 100644 --- a/platform/core/src/test/kotlin/integration/JWTTest.kt +++ b/platform/core/src/test/kotlin/integration/JWTTest.kt @@ -29,6 +29,7 @@ import spp.protocol.marshall.ServiceExceptionConverter import spp.protocol.platform.auth.AccessType.BLACK_LIST import spp.protocol.platform.auth.DeveloperRole import spp.protocol.platform.auth.RolePermission.ADD_LIVE_BREAKPOINT +import spp.protocol.platform.general.Service import spp.protocol.service.LiveInstrumentService import spp.protocol.service.error.InstrumentAccessDenied import spp.protocol.service.error.PermissionAccessDenied @@ -42,7 +43,7 @@ class JWTTest : PlatformIntegrationTest() { val testContext = VertxTestContext() instrumentService.addLiveInstrument( LiveBreakpoint( - location = LiveSourceLocation(JWTTest::class.java.name, 1, "spp-test-probe"), + location = LiveSourceLocation(JWTTest::class.java.name, 1, Service.fromName("spp-test-probe")), condition = "1 == 2" ) ).onComplete { diff --git a/platform/processor/live-instrument/build.gradle.kts b/platform/processor/live-instrument/build.gradle.kts index faa7b6a53..2a430c9e0 100644 --- a/platform/processor/live-instrument/build.gradle.kts +++ b/platform/processor/live-instrument/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { compileOnly(project(":platform:storage")) testImplementation(project(":probes:jvm:boot")) + testCompileOnly(project(":probes:jvm:common")) testImplementation("org.apache.logging.log4j:log4j-core:2.20.0") //todo: properly add test dependency testImplementation(project(":platform:common").dependencyProject.extensions.getByType(SourceSetContainer::class).test.get().output) diff --git a/platform/processor/live-instrument/src/main/kotlin/spp/processor/instrument/impl/LiveLogAnalyzer.kt b/platform/processor/live-instrument/src/main/kotlin/spp/processor/instrument/impl/LiveLogAnalyzer.kt index 705f53c16..cffc9d356 100644 --- a/platform/processor/live-instrument/src/main/kotlin/spp/processor/instrument/impl/LiveLogAnalyzer.kt +++ b/platform/processor/live-instrument/src/main/kotlin/spp/processor/instrument/impl/LiveLogAnalyzer.kt @@ -39,6 +39,7 @@ import spp.protocol.artifact.log.LogOrderType import spp.protocol.artifact.log.LogResult import spp.protocol.instrument.event.LiveLogHit import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import java.time.Instant import java.util.concurrent.TimeUnit import kotlin.collections.set @@ -84,7 +85,7 @@ class LiveLogAnalyzer : LogAnalysisListener, LogAnalysisListenerFactory { LiveSourceLocation( logSource, logLineNumber ?: -1, - service = logData.service, + service = Service.fromName(logData.service), serviceInstance = logData.serviceInstance ) } else null diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentEventsTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentEventsTest.kt index cd992833a..e55e6bfb0 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentEventsTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentEventsTest.kt @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.* import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addLiveInstrumentListener import java.util.concurrent.atomic.AtomicInteger @@ -66,7 +67,7 @@ class LiveInstrumentEventsTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveInstrumentEventsTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = instrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentTest.kt index dd0ac9445..ca1997616 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/LiveInstrumentTest.kt @@ -26,6 +26,7 @@ import org.junit.jupiter.api.parallel.Isolated import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.LiveLog import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import spp.protocol.service.listen.addLogHitListener import java.util.concurrent.atomic.AtomicInteger @@ -59,7 +60,12 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { @Test fun getLiveInstrumentById(): Unit = runBlocking { val instrument = instrumentService.addLiveInstrument( - LiveBreakpoint(location = LiveSourceLocation("integration.LiveInstrumentTest", 1, "spp-test-probe")) + LiveBreakpoint( + location = LiveSourceLocation( + "integration.LiveInstrumentTest", 1, + Service.fromName("spp-test-probe") + ) + ) ).await() val originalId = instrument.id!! @@ -73,8 +79,18 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { fun getLiveInstrumentByIds(): Unit = runBlocking { val instrument = instrumentService.addLiveInstruments( listOf( - LiveBreakpoint(location = LiveSourceLocation("integration.LiveInstrumentTest", 1, "spp-test-probe")), - LiveBreakpoint(location = LiveSourceLocation("integration.LiveInstrumentTest", 2, "spp-test-probe")) + LiveBreakpoint( + location = LiveSourceLocation( + "integration.LiveInstrumentTest", 1, + Service.fromName("spp-test-probe") + ) + ), + LiveBreakpoint( + location = LiveSourceLocation( + "integration.LiveInstrumentTest", 2, + Service.fromName("spp-test-probe") + ) + ) ) ).await() @@ -120,7 +136,7 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveInstrumentTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-log" @@ -131,7 +147,7 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveInstrumentTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-breakpoint" @@ -166,7 +182,7 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveInstrumentTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), "1==2", applyImmediately = true, @@ -178,7 +194,7 @@ class LiveInstrumentTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveInstrumentTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-breakpoint" diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/LiveVariablePresentationTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/LiveVariablePresentationTest.kt index d1251bee9..e5e6991e8 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/LiveVariablePresentationTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/LiveVariablePresentationTest.kt @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.math.BigInteger import java.time.* @@ -199,7 +200,7 @@ class LiveVariablePresentationTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveVariablePresentationTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/NegativePrimitiveConditionITTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/NegativePrimitiveConditionITTest.kt index 38d916a50..f9b066589 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/NegativePrimitiveConditionITTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/NegativePrimitiveConditionITTest.kt @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE", "unused") @@ -73,7 +74,7 @@ class NegativePrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( NegativePrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "staticFields[fieldI] == 100", applyImmediately = true @@ -107,7 +108,7 @@ class NegativePrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( NegativePrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "fields[instanceI] == 100", applyImmediately = true @@ -141,7 +142,7 @@ class NegativePrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( NegativePrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "localVariables[localI] == 100", applyImmediately = true diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/PrimitiveConditionITTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/PrimitiveConditionITTest.kt index ed9e020f1..ec3f6aa0a 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/PrimitiveConditionITTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/PrimitiveConditionITTest.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE", "unused") @@ -65,7 +66,7 @@ class PrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( PrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "staticFields[fieldI] == 100", applyImmediately = true, @@ -96,7 +97,7 @@ class PrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( PrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "fields[instanceI] == 100", applyImmediately = true, @@ -127,7 +128,7 @@ class PrimitiveConditionITTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( PrimitiveConditionITTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "localVariables[localI] == 100", applyImmediately = true, diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/RemoveInstrumentsTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/RemoveInstrumentsTest.kt index 943c86051..c2501c5c6 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/RemoveInstrumentsTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/RemoveInstrumentsTest.kt @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.LiveInstrumentRemoved import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addLiveInstrumentListener import java.util.concurrent.atomic.AtomicInteger @@ -39,11 +40,17 @@ class RemoveInstrumentsTest : LiveInstrumentIntegrationTest() { val instruments = instrumentService.addLiveInstruments( listOf( LiveBreakpoint( - location = LiveSourceLocation(RemoveInstrumentsTest::class.java.name, service = "spp-test-probe"), + location = LiveSourceLocation( + RemoveInstrumentsTest::class.java.name, + service = Service.fromName("spp-test-probe") + ), id = testNameAsUniqueInstrumentId ), LiveBreakpoint( - location = LiveSourceLocation(RemoveInstrumentsTest::class.java.name, service = "spp-test-probe"), + location = LiveSourceLocation( + RemoveInstrumentsTest::class.java.name, + service = Service.fromName("spp-test-probe") + ), id = testNameAsUniqueInstrumentId ) ) @@ -56,7 +63,10 @@ class RemoveInstrumentsTest : LiveInstrumentIntegrationTest() { override fun onInstrumentRemovedEvent(event: LiveInstrumentRemoved) { testContext.verify { assertEquals( - LiveSourceLocation(RemoveInstrumentsTest::class.java.name, service = "spp-test-probe"), + LiveSourceLocation( + RemoveInstrumentsTest::class.java.name, + service = Service.fromName("spp-test-probe") + ), event.instrument.location ) @@ -71,7 +81,10 @@ class RemoveInstrumentsTest : LiveInstrumentIntegrationTest() { } val removeInstruments = instrumentService.removeLiveInstruments( - LiveSourceLocation(RemoveInstrumentsTest::class.java.name, service = "spp-test-probe") + LiveSourceLocation( + RemoveInstrumentsTest::class.java.name, + service = Service.fromName("spp-test-probe") + ) ).await() assertEquals(2, removeInstruments.size) diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/SimplePrimitivesLiveInstrumentTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/SimplePrimitivesLiveInstrumentTest.kt index 18a7810de..16a324e6c 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/SimplePrimitivesLiveInstrumentTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/SimplePrimitivesLiveInstrumentTest.kt @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class SimplePrimitivesLiveInstrumentTest : LiveInstrumentIntegrationTest() { @@ -134,7 +135,7 @@ class SimplePrimitivesLiveInstrumentTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( SimplePrimitivesLiveInstrumentTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/VCSLiveInstrumentIT.kt b/platform/processor/live-instrument/src/test/kotlin/integration/VCSLiveInstrumentIT.kt new file mode 100644 index 000000000..ace56f802 --- /dev/null +++ b/platform/processor/live-instrument/src/test/kotlin/integration/VCSLiveInstrumentIT.kt @@ -0,0 +1,121 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package integration + +import io.vertx.core.json.JsonObject +import io.vertx.junit5.VertxTestContext +import io.vertx.kotlin.coroutines.await +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.parallel.Isolated +import spp.probe.ProbeConfiguration +import spp.protocol.instrument.LiveBreakpoint +import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service +import spp.protocol.service.listen.addBreakpointHitListener +import java.util.concurrent.atomic.AtomicInteger + +@Isolated +class VCSLiveInstrumentIT : LiveInstrumentIntegrationTest() { + + private fun doTest() { + startEntrySpan("doTest") + addLineLabel("done") { Throwable().stackTrace[0].lineNumber } + stopSpan() + } + + @Test + fun `listen to versioned breakpoint hits by instrument`(): Unit = runBlocking { + setupLineLabels { + doTest() + } + + val probeId = ProbeConfiguration.PROBE_ID + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test1") + ) + ).await() + delay(2000) + + val hitCount = AtomicInteger() + var testContext = VertxTestContext() + val instrument = instrumentService.addLiveInstrument( + LiveBreakpoint( + location = LiveSourceLocation( + VCSLiveInstrumentIT::class.java.name, + getLineNumber("done"), + Service.fromName("spp-test-probe") + ), + applyImmediately = true, + id = testNameAsUniqueInstrumentId, + hitLimit = 2 + ) + ).await() + vertx.addBreakpointHitListener(instrument.id!!) { bpHit -> + log.info("Received breakpoint hit: $bpHit") + testContext.verify { + assertTrue(bpHit.stackTrace.elements.isNotEmpty()) + val topFrame = bpHit.stackTrace.elements.first() + assertEquals(1, topFrame.variables.size) + } + + if (hitCount.incrementAndGet() == 1) { + testContext.verify { + assertEquals( + instrument.location.service?.name + + "|" + instrument.location.service?.environment.toString() + + "|" + "test1", + bpHit.service + ) + } + } else { + testContext.verify { + assertEquals( + instrument.location.service?.name + + "|" + instrument.location.service?.environment.toString() + + "|" + "test2", + bpHit.service + ) + } + } + testContext.completeNow() + }.await() + + doTest() + errorOnTimeout(testContext) + testContext = VertxTestContext() + + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test2") + ) + ).await() + delay(2000) + + doTest() + errorOnTimeout(testContext) + } +} diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/AtomicValueLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/AtomicValueLiveBreakpointTest.kt index 75446cd49..0e36b2435 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/AtomicValueLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/AtomicValueLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.variable.LiveVariable +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference @@ -116,7 +117,7 @@ class AtomicValueLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( AtomicValueLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/BreakpointRedactionTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/BreakpointRedactionTest.kt index dbd6618bb..69fb54272 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/BreakpointRedactionTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/BreakpointRedactionTest.kt @@ -29,6 +29,7 @@ import spp.protocol.platform.auth.DeveloperRole import spp.protocol.platform.auth.RedactionType import spp.protocol.platform.auth.RedactionType.IDENTIFIER_MATCH import spp.protocol.platform.auth.RolePermission +import spp.protocol.platform.general.Service import spp.protocol.service.LiveInstrumentService import spp.protocol.service.listen.addBreakpointHitListener @@ -105,7 +106,7 @@ class BreakpointRedactionTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( BreakpointRedactionTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/CyclicObjectLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/CyclicObjectLiveBreakpointTest.kt index a4ba6d874..7ba742c5e 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/CyclicObjectLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/CyclicObjectLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class CyclicObjectLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -98,7 +99,7 @@ class CyclicObjectLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( CyclicObjectLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/DeepObjectLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/DeepObjectLiveBreakpointTest.kt index 25bcba1b5..90f3292cc 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/DeepObjectLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/DeepObjectLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("unused", "UNUSED_VARIABLE") @@ -133,7 +134,7 @@ class DeepObjectLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( DeepObjectLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/HitLimitLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/HitLimitLiveBreakpointTest.kt index 89d9fdd97..7b133c05f 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/HitLimitLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/HitLimitLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.LiveInstrumentRemoved import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.throttle.InstrumentThrottle +import spp.protocol.platform.general.Service import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addBreakpointHitListener import spp.protocol.service.listen.addLiveInstrumentListener @@ -53,7 +54,7 @@ class HitLimitLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( HitLimitLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = 11, throttle = InstrumentThrottle.NONE, diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/InnerClassBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/InnerClassBreakpointTest.kt index dbbcc6a79..15655af1c 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/InnerClassBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/InnerClassBreakpointTest.kt @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.variable.LiveVariableScope +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE") @@ -52,7 +53,7 @@ class InnerClassBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( InnerClass::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsUniqueInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeArrayLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeArrayLiveBreakpointTest.kt index 9dea22329..892a13027 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeArrayLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeArrayLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.util.* @@ -84,7 +85,7 @@ class LargeArrayLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargeArrayLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeListLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeListLiveBreakpointTest.kt index df1a2e643..5de2abcd3 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeListLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeListLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class LargeListLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -85,7 +86,7 @@ class LargeListLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargeListLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeMapLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeMapLiveBreakpointTest.kt index 6f9c655d8..47fdd8904 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeMapLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeMapLiveBreakpointTest.kt @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class LargeMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -82,7 +83,7 @@ class LargeMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargeMapLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeObjectLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeObjectLiveBreakpointTest.kt index 4d53fd586..12e4d84bc 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeObjectLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeObjectLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE") @@ -83,7 +84,7 @@ class LargeObjectLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargeObjectLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargePrimitiveArrayLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargePrimitiveArrayLiveBreakpointTest.kt index 1401eb2f9..95cc61ff1 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargePrimitiveArrayLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargePrimitiveArrayLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.util.* @@ -83,7 +84,7 @@ class LargePrimitiveArrayLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargePrimitiveArrayLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeSetLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeSetLiveBreakpointTest.kt index 28b1b5111..1062c2d79 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeSetLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LargeSetLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class LargeSetLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -84,7 +85,7 @@ class LargeSetLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LargeSetLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LiveBreakpointTest.kt index 2d66a9115..8c858feb4 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/LiveBreakpointTest.kt @@ -28,6 +28,7 @@ import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.* import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.marshall.ServiceExceptionConverter +import spp.protocol.platform.general.Service import spp.protocol.service.error.LiveInstrumentException import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addLiveInstrumentListener @@ -142,7 +143,7 @@ class LiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) @@ -218,7 +219,7 @@ class LiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "2==2", applyImmediately = true diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MetaLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MetaLiveBreakpointTest.kt index 9e9b7c6b1..f3270e421 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MetaLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MetaLiveBreakpointTest.kt @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service class MetaLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -138,7 +139,7 @@ class MetaLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( MetaLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsUniqueInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MultiLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MultiLiveBreakpointTest.kt index d8d7b44dc..4db286609 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MultiLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/MultiLiveBreakpointTest.kt @@ -29,6 +29,7 @@ import spp.protocol.artifact.exception.sourceAsLineNumber import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.LiveBreakpointHit import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -73,7 +74,7 @@ class MultiLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( MultiLiveBreakpointTest::class.java.name, getLineNumber("line1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-1" @@ -82,7 +83,7 @@ class MultiLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( MultiLiveBreakpointTest::class.java.name, getLineNumber("line1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-2" @@ -143,7 +144,7 @@ class MultiLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( MultiLiveBreakpointTest::class.java.name, getLineNumber("line1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-1" @@ -152,7 +153,7 @@ class MultiLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( MultiLiveBreakpointTest::class.java.name, getLineNumber("line2"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = "$testNameAsInstrumentId-2" diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/NullArrayLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/NullArrayLiveBreakpointTest.kt index 1b147aa65..8afe2fb23 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/NullArrayLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/NullArrayLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE") @@ -76,7 +77,7 @@ class NullArrayLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( NullArrayLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/RemoveByLocationLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/RemoveByLocationLiveBreakpointTest.kt index e7e46b7b1..659964578 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/RemoveByLocationLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/RemoveByLocationLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.event.LiveBreakpointHit import spp.protocol.instrument.event.LiveInstrumentApplied import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addBreakpointHitListener import spp.protocol.service.listen.addLiveInstrumentListener @@ -94,7 +95,7 @@ class RemoveByLocationLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( RemoveByLocationLiveBreakpointTest::class.java.name, getLineNumber("line1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = 2, //applyImmediately = true, @@ -104,7 +105,7 @@ class RemoveByLocationLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( RemoveByLocationLiveBreakpointTest::class.java.name, getLineNumber("line2"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = 2, //applyImmediately = true, @@ -127,7 +128,7 @@ class RemoveByLocationLiveBreakpointTest : LiveInstrumentIntegrationTest() { LiveSourceLocation( RemoveByLocationLiveBreakpointTest::class.java.name, getLineNumber("line1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ) ).await() testContext.verify { @@ -137,7 +138,10 @@ class RemoveByLocationLiveBreakpointTest : LiveInstrumentIntegrationTest() { //ensure line1 is removed and line2 is still there val remainingInstruments = instrumentService.getLiveInstrumentsByLocation( - LiveSourceLocation(RemoveByLocationLiveBreakpointTest::class.java.name, service = "spp-test-probe") + LiveSourceLocation( + RemoveByLocationLiveBreakpointTest::class.java.name, + service = Service.fromName("spp-test-probe") + ) ).await() testContext.verify { assertEquals(1, remainingInstruments.size) @@ -154,7 +158,10 @@ class RemoveByLocationLiveBreakpointTest : LiveInstrumentIntegrationTest() { assertEquals( 1, instrumentService.removeLiveInstruments( - LiveSourceLocation(RemoveByLocationLiveBreakpointTest::class.java.name, service = "spp-test-probe") + LiveSourceLocation( + RemoveByLocationLiveBreakpointTest::class.java.name, + service = Service.fromName("spp-test-probe") + ) ).await().size ) } diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SimpleCollectionsLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SimpleCollectionsLiveBreakpointTest.kt index 6899e2479..1d5719d95 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SimpleCollectionsLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SimpleCollectionsLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener @Suppress("UNUSED_VARIABLE") @@ -183,7 +184,7 @@ class SimpleCollectionsLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( SimpleCollectionsLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SmallMapLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SmallMapLiveBreakpointTest.kt index 51da399a5..9ac945ab8 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SmallMapLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/SmallMapLiveBreakpointTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener class SmallMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { @@ -104,7 +105,7 @@ class SmallMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( SmallMapLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) @@ -159,7 +160,7 @@ class SmallMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( SmallMapLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) @@ -214,7 +215,7 @@ class SmallMapLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( SmallMapLiveBreakpointTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true ) diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/ThrottleLiveBreakpointTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/ThrottleLiveBreakpointTest.kt index 02764fa3a..7a1b85f03 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/ThrottleLiveBreakpointTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/breakpoint/ThrottleLiveBreakpointTest.kt @@ -28,6 +28,7 @@ import spp.protocol.instrument.LiveBreakpoint import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.throttle.InstrumentThrottle import spp.protocol.instrument.throttle.ThrottleStep +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addBreakpointHitListener import java.util.concurrent.atomic.AtomicInteger @@ -73,7 +74,7 @@ class ThrottleLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( ThrottleLiveBreakpointTest::class.java.name, getLineNumber("throttle1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = -1, applyImmediately = true, @@ -119,7 +120,7 @@ class ThrottleLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( ThrottleLiveBreakpointTest::class.java.name, getLineNumber("throttle2"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = -1, throttle = InstrumentThrottle(2, ThrottleStep.SECOND), @@ -165,7 +166,7 @@ class ThrottleLiveBreakpointTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( ThrottleLiveBreakpointTest::class.java.name, getLineNumber("throttle3"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), hitLimit = -1, throttle = InstrumentThrottle.NONE, diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/log/FormatLiveLogTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/log/FormatLiveLogTest.kt index 8c7eee76a..7bbe0606a 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/log/FormatLiveLogTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/log/FormatLiveLogTest.kt @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Test import spp.protocol.instrument.LiveLog import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.listen.addLogHitListener class FormatLiveLogTest : LiveInstrumentIntegrationTest() { @@ -72,7 +73,7 @@ class FormatLiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( FormatLiveLogTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), applyImmediately = true, id = testNameAsInstrumentId diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/log/LiveLogTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/log/LiveLogTest.kt index 22339370c..27da8be56 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/log/LiveLogTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/log/LiveLogTest.kt @@ -29,6 +29,7 @@ import spp.protocol.instrument.LiveLog import spp.protocol.instrument.event.* import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.marshall.ServiceExceptionConverter +import spp.protocol.platform.general.Service import spp.protocol.service.error.LiveInstrumentException import spp.protocol.service.listen.LiveInstrumentListener import spp.protocol.service.listen.addLiveInstrumentListener @@ -101,7 +102,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveLogTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), logFormat = "addHitRemove", applyImmediately = true @@ -140,7 +141,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( "FakeClass", 4, - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "1==2", logFormat = "removeById" @@ -185,7 +186,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveLogTest::class.java.name, 100, - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "1==2", logFormat = "removeMultipleByLocation" @@ -195,7 +196,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveLogTest::class.java.name, 100, - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "1==3", logFormat = "removeMultipleByLocation" @@ -208,7 +209,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveLogTest::class.java.name, 100, - "spp-test-probe" + Service.fromName("spp-test-probe") ) ).onComplete { if (it.succeeded()) { @@ -240,7 +241,7 @@ class LiveLogTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveLogTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), condition = "1===2", logFormat = "addLogWithInvalidCondition", diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterCountTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterCountTest.kt index 27aba57d0..c5fdaf125 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterCountTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterCountTest.kt @@ -32,6 +32,7 @@ import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.meter.MeterType import spp.protocol.instrument.meter.MetricValue import spp.protocol.instrument.meter.MetricValueType +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscriberAddress import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView @@ -66,7 +67,7 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterCountTest::class.java.name, getLineNumber("count1"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, applyImmediately = true @@ -77,6 +78,7 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { LiveView( entityIds = mutableSetOf(liveMeter.id!!), viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! @@ -130,7 +132,7 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterCountTest::class.java.name, getLineNumber("count2"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, applyImmediately = true @@ -140,7 +142,8 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { val subscriptionId = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter.id!!), - viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)) + viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! @@ -194,7 +197,7 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterCountTest::class.java.name, getLineNumber("count3"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, applyImmediately = true @@ -204,7 +207,8 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { val subscriptionId1 = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter1.id!!), - viewConfig = LiveViewConfig("test", listOf(liveMeter1.id!!)) + viewConfig = LiveViewConfig("test", listOf(liveMeter1.id!!)), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! @@ -214,7 +218,7 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterCountTest::class.java.name, getLineNumber("count3"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, meta = mapOf("metric.mode" to "RATE"), @@ -225,7 +229,8 @@ class LiveMeterCountTest : LiveInstrumentIntegrationTest() { val subscriptionId2 = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter2.id!!), - viewConfig = LiveViewConfig("test", listOf(liveMeter2.id!!)) + viewConfig = LiveViewConfig("test", listOf(liveMeter2.id!!)), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterGaugeTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterGaugeTest.kt index d4604ae87..a25ea34c7 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterGaugeTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterGaugeTest.kt @@ -24,13 +24,13 @@ import io.vertx.kotlin.coroutines.await import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test -import spp.protocol.artifact.ArtifactQualifiedName -import spp.protocol.artifact.ArtifactType +import org.junit.jupiter.api.parallel.Isolated import spp.protocol.instrument.LiveMeter import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.meter.MeterType import spp.protocol.instrument.meter.MetricValue import spp.protocol.instrument.meter.MetricValueType +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView import spp.protocol.view.LiveViewConfig @@ -42,6 +42,7 @@ import java.io.Serializable import java.util.* import java.util.function.Supplier +@Isolated class LiveMeterGaugeTest : LiveInstrumentIntegrationTest() { @Suppress("UNUSED_VARIABLE") @@ -71,7 +72,7 @@ class LiveMeterGaugeTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterGaugeTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = instrumentId, applyImmediately = true, @@ -95,18 +96,11 @@ class LiveMeterGaugeTest : LiveInstrumentIntegrationTest() { val subscriptionId = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter.id!!), - artifactQualifiedName = ArtifactQualifiedName( - LiveMeterGaugeTest::class.java.name, - type = ArtifactType.EXPRESSION - ), - artifactLocation = LiveSourceLocation( - LiveMeterGaugeTest::class.java.name, - getLineNumber("done") - ), viewConfig = LiveViewConfig( "test", listOf(liveMeter.id!!) - ) + ), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! @@ -157,7 +151,7 @@ class LiveMeterGaugeTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterGaugeTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, applyImmediately = true, @@ -181,7 +175,8 @@ class LiveMeterGaugeTest : LiveInstrumentIntegrationTest() { val subscriptionId = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter.id!!), - viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)) + viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! diff --git a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterRateTest.kt b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterRateTest.kt index 04e80046f..734f2ab64 100644 --- a/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterRateTest.kt +++ b/platform/processor/live-instrument/src/test/kotlin/integration/meter/LiveMeterRateTest.kt @@ -28,13 +28,12 @@ import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.parallel.Isolated -import spp.protocol.artifact.ArtifactQualifiedName -import spp.protocol.artifact.ArtifactType import spp.protocol.instrument.LiveMeter import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.meter.MeterType import spp.protocol.instrument.meter.MetricValue import spp.protocol.instrument.meter.MetricValueType +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView import spp.protocol.view.LiveViewConfig @@ -66,7 +65,7 @@ class LiveMeterRateTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterRateTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, meta = mapOf("metric.mode" to "RATE"), @@ -91,18 +90,11 @@ class LiveMeterRateTest : LiveInstrumentIntegrationTest() { val subscriptionId = viewService.addLiveView( LiveView( entityIds = mutableSetOf(liveMeter.id!!), - artifactQualifiedName = ArtifactQualifiedName( - LiveMeterRateTest::class.java.name, - type = ArtifactType.EXPRESSION - ), - artifactLocation = LiveSourceLocation( - LiveMeterRateTest::class.java.name, - getLineNumber("done") - ), viewConfig = LiveViewConfig( "test", listOf(liveMeter.id!!) - ) + ), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! diff --git a/platform/processor/live-instrument/src/test/resources/spp-test-probe.yml b/platform/processor/live-instrument/src/test/resources/spp-test-probe.yml index 1d122e096..53d378ec3 100644 --- a/platform/processor/live-instrument/src/test/resources/spp-test-probe.yml +++ b/platform/processor/live-instrument/src/test/resources/spp-test-probe.yml @@ -4,6 +4,11 @@ spp: client_secret: "test-secret" platform_host: "localhost" platform_port: 12800 + application: + environment: ${SPP_APPLICATION_ENVIRONMENT:-} + version: ${SPP_APPLICATION_VERSION:-} + git_commit: ${SPP_APPLICATION_GIT_COMMIT:-} + skywalking: logging: level: "DEBUG" diff --git a/platform/processor/live-view/build.gradle.kts b/platform/processor/live-view/build.gradle.kts index 941d2b9cc..2a25f5a75 100644 --- a/platform/processor/live-view/build.gradle.kts +++ b/platform/processor/live-view/build.gradle.kts @@ -41,8 +41,18 @@ dependencies { compileOnly("org.apache.skywalking:skywalking-meter-receiver-plugin:$skywalkingVersion") { isTransitive = false } + compileOnly("org.apache.skywalking:skywalking-jvm-receiver-plugin:$skywalkingVersion") { + isTransitive = false + } + compileOnly("org.apache.skywalking:skywalking-log-recevier-plugin:$skywalkingVersion") { + isTransitive = false + } + compileOnly("org.apache.skywalking:skywalking-management-receiver-plugin:$skywalkingVersion") { + isTransitive = false + } testImplementation(project(":probes:jvm:boot")) + testCompileOnly(project(":probes:jvm:common")) testImplementation("org.apache.logging.log4j:log4j-core:2.20.0") //todo: properly add test dependency testImplementation(project(":platform:common").dependencyProject.extensions.getByType(SourceSetContainer::class).test.get().output) diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/ViewProcessor.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/ViewProcessor.kt index 064ce0024..23a88b99e 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/ViewProcessor.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/ViewProcessor.kt @@ -42,7 +42,7 @@ import spp.platform.common.FeedbackProcessor import spp.platform.storage.ExpiringSharedData import spp.platform.storage.SourceStorage import spp.processor.view.impl.LiveViewServiceImpl -import spp.processor.view.impl.view.model.ClusterMetrics +import spp.processor.view.model.ClusterMetrics import spp.protocol.platform.auth.RolePermission import spp.protocol.platform.developer.SelfInfo import spp.protocol.service.LiveManagementService diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveMeterProcessService.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveMeterProcessService.kt index fbac9d2f3..11931ee96 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveMeterProcessService.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveMeterProcessService.kt @@ -18,16 +18,31 @@ package spp.processor.view.impl import org.apache.skywalking.apm.network.language.agent.v3.MeterData -import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService +import org.apache.skywalking.oap.meter.analyzer.MetricConvert +import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessService import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor +import org.apache.skywalking.oap.server.library.module.ModuleManager import spp.processor.view.model.LiveMetricConvert import spp.protocol.instrument.LiveMeter +import spp.protocol.instrument.meter.MeterPartition +import spp.protocol.view.rule.RulePartition -class LiveMeterProcessService(private val delegate: MeterProcessService) : IMeterProcessService by delegate { +/** + * Replaces the default meter process service to allows for processing + * meters with [MeterPartition]s via [RulePartition]s. + */ +class LiveMeterProcessService( + private val delegate: MeterProcessService, + manager: ModuleManager, +) : MeterProcessService(manager) { private val existingPartitions = mutableSetOf() + override fun start(configs: MutableList?) { + delegate.start(configs) + } + override fun createProcessor(): MeterProcessor { val processor = delegate.createProcessor() return object : MeterProcessor(delegate) { @@ -58,4 +73,8 @@ class LiveMeterProcessService(private val delegate: MeterProcessService) : IMete override fun process() = processor.process() } } + + override fun converts(): MutableList { + return delegate.converts() + } } diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveViewServiceImpl.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveViewServiceImpl.kt index f90fdb68e..a669a7f56 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveViewServiceImpl.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/LiveViewServiceImpl.kt @@ -24,7 +24,9 @@ import io.vertx.core.json.JsonArray import io.vertx.core.json.JsonObject import io.vertx.kotlin.coroutines.CoroutineVerticle import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.dispatcher import io.vertx.serviceproxy.ServiceException +import kotlinx.coroutines.runBlocking import mu.KotlinLogging import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService @@ -59,16 +61,18 @@ import spp.processor.view.impl.view.LiveMeterView import spp.processor.view.impl.view.LiveTraceView import spp.processor.view.impl.view.util.EntitySubscribersCache import spp.processor.view.impl.view.util.MetricTypeSubscriptionCache -import spp.processor.view.impl.view.util.ViewSubscriber import spp.processor.view.model.LiveMeterConfig import spp.processor.view.model.LiveMetricConvert import spp.processor.view.model.LiveMetricConvert.Companion.NOP_RULE +import spp.processor.view.model.ViewSubscriber import spp.protocol.artifact.metrics.MetricStep import spp.protocol.artifact.metrics.MetricType import spp.protocol.artifact.trace.TraceSpan import spp.protocol.artifact.trace.TraceSpanRef import spp.protocol.artifact.trace.TraceStack import spp.protocol.platform.PlatformAddress.MARKER_DISCONNECTED +import spp.protocol.platform.general.Service +import spp.protocol.service.LiveManagementService import spp.protocol.service.LiveViewService import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscriberAddress import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription @@ -252,7 +256,6 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { LiveViewEvent( sub.subscriptionId!!, sub.entityIds.first(), - sub.artifactQualifiedName, firstEvent.getString("timeBucket"), sub.viewConfig, events.toString() @@ -261,7 +264,6 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { LiveViewEvent( sub.subscriptionId!!, sub.entityIds.first(), - sub.artifactQualifiedName, event.getString("timeBucket"), sub.viewConfig, event.toString() @@ -312,11 +314,11 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { if (unsubbedSubscriber != null) { val removedView = LiveView( - unsubbedSubscriber!!.subscription.subscriptionId, unsubbedSubscriber!!.subscription.entityIds, - unsubbedSubscriber!!.subscription.artifactQualifiedName, - unsubbedSubscriber!!.subscription.artifactLocation, - unsubbedSubscriber!!.subscription.viewConfig + unsubbedSubscriber!!.subscription.viewConfig, + unsubbedSubscriber!!.subscription.service, + unsubbedSubscriber!!.subscription.serviceInstance, + unsubbedSubscriber!!.subscription.subscriptionId, ) log.debug { "Removed live view: {}".args(removedView) } @@ -397,11 +399,11 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { if (subbedUser != null) { promise.complete( LiveView( - subbedUser!!.subscription.subscriptionId, subbedUser!!.subscription.entityIds, - subbedUser!!.subscription.artifactQualifiedName, - subbedUser!!.subscription.artifactLocation, - subbedUser!!.subscription.viewConfig + subbedUser!!.subscription.viewConfig, + subbedUser!!.subscription.service, + subbedUser!!.subscription.serviceInstance, + subbedUser!!.subscription.subscriptionId, ) ) } else { @@ -519,6 +521,57 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { entityId: String, duration: Duration, labels: List + ): JsonArray { + val service = try { + Service.fromId(entityId) + } catch (ignored: Exception) { + null + } + + return if (service != null) { + //todo: get VCS services more efficiently (search by active commits) + val devAuth = Vertx.currentContext().getLocal("developer") + val managementService = LiveManagementService.createProxy(vertx, devAuth.accessToken) + val allServices = runBlocking(vertx.dispatcher()) { + managementService.getServices().await().map { it.withName(it.name) } + } + val searchServices = allServices.filter { + service.isSameLocation(service.withName(it.name)) + } + + val allMetrics = mutableListOf() + searchServices.forEach { + val array = searchService(metricType, it.id, labels, duration) + for (i in 0 until array.size()) { + array.getJsonObject(i).put("service", it.toJson()) + } + allMetrics.add(array) + } + + val mergeArray = JsonArray() + repeat(allMetrics.first().count()) { + mergeArray.add(JsonObject()) + } + allMetrics.forEach { + it.forEachIndexed { i, metrics -> + val a = mergeArray.getJsonObject(i) + val b = metrics as JsonObject + if (a.getValue("value") == null) { + a.mergeIn(b) + } //todo: more accurate merging (env, metric type, etc) + } + } + mergeArray + } else { + searchService(metricType, entityId, labels, duration) + } + } + + private fun searchService( + metricType: MetricType, + entityId: String, + labels: List, + duration: Duration ): JsonArray { val condition = MetricsCondition() condition.name = metricType.metricId @@ -531,13 +584,23 @@ class LiveViewServiceImpl : CoroutineVerticle(), LiveViewService { JsonArray().apply { metricsQuery.readLabeledMetricsValues(condition, labels, duration).forEach { val label = it.label - it.values.values.forEach { add(JsonObject().put("value", it.value).put("label", label)) } + it.values.values.forEach { + val valueOb = JsonObject().put("label", label) + if (!it.isEmptyValue) { + valueOb.put("value", it.value) + } + add(valueOb) + } } } } else { JsonArray().apply { metricsQuery.readMetricsValues(condition, duration).values.values.forEach { - add(JsonObject().put("value", it.value)) + val valueOb = JsonObject() + if (!it.isEmptyValue) { + valueOb.put("value", it.value) + } + add(valueOb) } } } diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/SPPRemoteClient.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/SPPRemoteClient.kt index f6b4c4fde..2dd33c2ac 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/SPPRemoteClient.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/SPPRemoteClient.kt @@ -30,8 +30,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager import org.joor.Reflect import spp.platform.common.ClusterConnection import spp.processor.view.ViewProcessor -import spp.processor.view.impl.view.model.ClusterMetrics import spp.processor.view.impl.view.util.EntityNaming +import spp.processor.view.model.ClusterMetrics import spp.protocol.artifact.metrics.MetricType class SPPRemoteClient( diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveLogView.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveLogView.kt index 4133d9470..96ad24211 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveLogView.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveLogView.kt @@ -29,12 +29,12 @@ import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisL import org.apache.skywalking.oap.server.core.analysis.Layer import spp.platform.common.ClusterConnection import spp.processor.view.ViewProcessor -import spp.processor.view.impl.view.model.LiveGaugeValueMetrics -import spp.processor.view.impl.view.util.EntityNaming.isSameService import spp.processor.view.impl.view.util.MetricTypeSubscriptionCache -import spp.processor.view.impl.view.util.ViewSubscriber +import spp.processor.view.model.LiveGaugeValueMetrics +import spp.processor.view.model.ViewSubscriber import spp.protocol.artifact.log.Log import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import java.time.Instant import java.time.ZoneOffset import java.time.format.DateTimeFormatterBuilder @@ -76,14 +76,17 @@ class LiveLogView(private val subscriptionCache: MetricTypeSubscriptionCache) : var subbedArtifacts = subscriptionCache["endpoint_logs"] if (subbedArtifacts != null) { val logPattern = logData.body.text.text - var subs = subbedArtifacts[logPattern].orEmpty() + subbedArtifacts["*"].orEmpty() + var subs = subbedArtifacts[logPattern].orEmpty() + + subbedArtifacts["*"].orEmpty() //remove subscribers with additional filters subs = subs.filter { - val service = it.subscription.artifactLocation?.service - if (service != null && !isSameService(service, logData.service)) { - return@filter false - } + if (it.subscription.serviceInstance?.let { + it != logData.serviceInstance + } == true) return@filter false + if (it.subscription.service?.let { + !it.isSameLocation(it.withName(logData.service)) + } == true) return@filter false return@filter true }.toSet() @@ -92,14 +95,17 @@ class LiveLogView(private val subscriptionCache: MetricTypeSubscriptionCache) : subbedArtifacts = subscriptionCache["service_logs"] if (subbedArtifacts != null) { - var subs = subbedArtifacts[logData.service].orEmpty() + subbedArtifacts["*"].orEmpty() + var subs = subbedArtifacts[Service.fromName(logData.service).name].orEmpty() + + subbedArtifacts["*"].orEmpty() //remove subscribers with additional filters subs = subs.filter { - val service = it.subscription.artifactLocation?.service - if (service != null && !isSameService(service, logData.service)) { - return@filter false - } + if (it.subscription.serviceInstance?.let { + it != logData.serviceInstance + } == true) return@filter false + if (it.subscription.service?.let { + !it.isSameLocation(it.withName(logData.service)) + } == true) return@filter false return@filter true }.toSet() @@ -121,7 +127,7 @@ class LiveLogView(private val subscriptionCache: MetricTypeSubscriptionCache) : LiveSourceLocation( logSource, logLineNumber ?: -1, - service = logData.service, + service = Service.fromName(logData.service), serviceInstance = logData.serviceInstance ) } else null @@ -140,7 +146,6 @@ class LiveLogView(private val subscriptionCache: MetricTypeSubscriptionCache) : val event = JsonObject() .put("type", "LOGS") .put("multiMetrics", false) - .put("artifactQualifiedName", JsonObject.mapFrom(sub.subscription.artifactQualifiedName)) .put("entityId", logPattern) .put("timeBucket", formatter.format(logRecord.timestamp)) .put("log", JsonObject.mapFrom(logRecord)) diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveMeterView.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveMeterView.kt index 1e1001251..255db8ad2 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveMeterView.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveMeterView.kt @@ -30,9 +30,12 @@ import spp.platform.common.util.args import spp.platform.storage.SourceStorage import spp.processor.view.ViewProcessor import spp.processor.view.ViewProcessor.realtimeMetricCache -import spp.processor.view.impl.view.util.* -import spp.processor.view.impl.view.util.EntityNaming.isSameService +import spp.processor.view.impl.view.util.EntityNaming +import spp.processor.view.impl.view.util.InternalRealtimeViewSubscriber +import spp.processor.view.impl.view.util.InternalViewSubscriber +import spp.processor.view.impl.view.util.MetricTypeSubscriptionCache import spp.processor.view.model.LiveMetricConvert +import spp.processor.view.model.ViewSubscriber import spp.protocol.instrument.event.LiveMeterHit import java.time.Instant import java.util.concurrent.CopyOnWriteArrayList @@ -106,14 +109,12 @@ class LiveMeterView(private val subscriptionCache: MetricTypeSubscriptionCache) //remove subscribers with additional filters subs = subs.filter { - val service = it.subscription.artifactLocation?.service - if (service != null && metricService != null && !isSameService(metricService, service)) { - return@filter false - } - val serviceInstance = it.subscription.artifactLocation?.serviceInstance - if (serviceInstance != null && serviceInstance != metricServiceInstance) { - return@filter false - } + if (it.subscription.serviceInstance?.let { + it != metricServiceInstance + } == true) return@filter false + if (it.subscription.service?.let { + !it.isSameLocation(it.withId(metricService)) + } == true) return@filter false return@filter true }.toSet() @@ -193,10 +194,6 @@ class LiveMeterView(private val subscriptionCache: MetricTypeSubscriptionCache) val multiMetrics = JsonArray() waitingEventsForBucket.forEach { val metricsOb = JsonObject.mapFrom(it) - .put( - "artifactQualifiedName", - JsonObject.mapFrom(sub.subscription.artifactQualifiedName) - ) .put("entityName", EntityNaming.getEntityName((metrics as WithMetadata).meta)) log.trace { "Sending multi-metrics $metricsOb to ${sub.subscriberId}" } diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveTraceView.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveTraceView.kt index d51f7fb7f..40ba21453 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveTraceView.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/LiveTraceView.kt @@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager import org.slf4j.LoggerFactory import spp.platform.common.ClusterConnection import spp.processor.view.impl.view.util.MetricTypeSubscriptionCache -import spp.processor.view.impl.view.util.ViewSubscriber +import spp.processor.view.model.ViewSubscriber import spp.protocol.artifact.trace.Trace import spp.protocol.artifact.trace.TraceSpan import spp.protocol.artifact.trace.TraceSpanLogEntry @@ -92,7 +92,7 @@ class LiveTraceView( Instant.ofEpochMilli(span.startTime), Instant.ofEpochMilli(span.endTime), entityId, - subs.first().subscription.artifactQualifiedName, + null, span.spanType.name, span.peer, span.componentId.toString(), @@ -129,7 +129,6 @@ class LiveTraceView( val event = JsonObject() .put("type", "TRACES") .put("multiMetrics", false) - .put("artifactQualifiedName", JsonObject.mapFrom(sub.subscription.artifactQualifiedName)) .put("entityId", entityId) .put("timeBucket", formatter.format(trace.start)) .put("trace", JsonObject.mapFrom(trace)) diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntityNaming.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntityNaming.kt index 4e80a62b0..fc9b0d8ac 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntityNaming.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntityNaming.kt @@ -115,8 +115,4 @@ object EntityNaming { } return null } - - fun isSameService(serviceIdOrName: String, serviceName: String): Boolean { - return serviceIdOrName == serviceName || serviceIdOrName == IDManager.ServiceID.buildId(serviceName, true) - } } diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntitySubscribersCache.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntitySubscribersCache.kt index 36120961c..ba7a87aed 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntitySubscribersCache.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/EntitySubscribersCache.kt @@ -17,6 +17,7 @@ */ package spp.processor.view.impl.view.util +import spp.processor.view.model.ViewSubscriber import java.util.concurrent.ConcurrentHashMap class EntitySubscribersCache : ConcurrentHashMap>() diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/ClusterMetrics.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ClusterMetrics.kt similarity index 98% rename from platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/ClusterMetrics.kt rename to platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ClusterMetrics.kt index 72c296fcd..7dea04c5b 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/ClusterMetrics.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ClusterMetrics.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package spp.processor.view.impl.view.model +package spp.processor.view.model import io.vertx.core.buffer.Buffer import io.vertx.core.shareddata.ClusterSerializable diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/LiveGaugeValueMetrics.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/LiveGaugeValueMetrics.kt similarity index 98% rename from platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/LiveGaugeValueMetrics.kt rename to platform/processor/live-view/src/main/kotlin/spp/processor/view/model/LiveGaugeValueMetrics.kt index f08699753..499956c4a 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/model/LiveGaugeValueMetrics.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/LiveGaugeValueMetrics.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package spp.processor.view.impl.view.model +package spp.processor.view.model import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/ViewSubscriber.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ViewSubscriber.kt similarity index 97% rename from platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/ViewSubscriber.kt rename to platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ViewSubscriber.kt index 1e851c801..05646e6f8 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/impl/view/util/ViewSubscriber.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/model/ViewSubscriber.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package spp.processor.view.impl.view.util +package spp.processor.view.model import io.vertx.core.eventbus.MessageConsumer import io.vertx.core.json.JsonObject diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveMeterReceiverProvider.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveMeterReceiverProvider.kt deleted file mode 100644 index f2449a58b..000000000 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveMeterReceiverProvider.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Source++, the continuous feedback platform for developers. - * Copyright (C) 2022-2023 CodeBrig, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package spp.processor.view.provider - -import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule -import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService -import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessService -import org.apache.skywalking.oap.server.receiver.meter.provider.MeterReceiverProvider -import spp.processor.view.impl.LiveMeterProcessService -import spp.protocol.instrument.meter.MeterPartition -import spp.protocol.view.rule.RulePartition - -/** - * Replaces the default meter process service with the live meter process service. - * This is done to allow [LiveMeterProcessService] to process meters with [MeterPartition]s via [RulePartition]s. - */ -class LiveMeterReceiverProvider : MeterReceiverProvider() { - - override fun name(): String = "spp-live-meter-receiver" - - override fun start() { - val process = manager.find(AnalyzerModule.NAME) - .provider() - .getService(IMeterProcessService::class.java) as MeterProcessService - val analyzerModule = manager.find(AnalyzerModule.NAME).provider() - analyzerModule.registerServiceImplementation(IMeterProcessService::class.java, LiveMeterProcessService(process)) - super.start() - } -} diff --git a/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveViewProvider.kt b/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveViewProvider.kt index 3fc873c39..3db4db29d 100644 --- a/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveViewProvider.kt +++ b/platform/processor/live-view/src/main/kotlin/spp/processor/view/provider/LiveViewProvider.kt @@ -23,6 +23,8 @@ import kotlinx.coroutines.launch import mu.KotlinLogging import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule +import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService +import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessService import org.apache.skywalking.oap.server.core.CoreModule import org.apache.skywalking.oap.server.core.CoreModuleConfig import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord @@ -46,6 +48,7 @@ import org.joor.Reflect import spp.platform.common.ClusterConnection import spp.processor.view.ViewProcessor import spp.processor.view.ViewProcessor.liveViewService +import spp.processor.view.impl.LiveMeterProcessService import spp.processor.view.impl.SPPRemoteClient import java.util.concurrent.atomic.AtomicReference @@ -95,6 +98,15 @@ class LiveViewProvider : ModuleProvider() { override fun start() { log.info("Starting spp-live-view") + val process = manager.find(AnalyzerModule.NAME) + .provider() + .getService(IMeterProcessService::class.java) as MeterProcessService + val analyzerModule = manager.find(AnalyzerModule.NAME).provider() + analyzerModule.registerServiceImplementation( + IMeterProcessService::class.java, + LiveMeterProcessService(process, manager) + ) + val sppRemoteClient = AtomicReference() class SPPRemoteClientManager : RemoteClientManager { diff --git a/platform/processor/live-view/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/platform/processor/live-view/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider index 1213ca308..ab1f02b2f 100644 --- a/platform/processor/live-view/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider +++ b/platform/processor/live-view/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -1,2 +1 @@ -spp.processor.view.provider.LiveViewProvider -spp.processor.view.provider.LiveMeterReceiverProvider \ No newline at end of file +spp.processor.view.provider.LiveViewProvider \ No newline at end of file diff --git a/platform/processor/live-view/src/test/kotlin/integration/LiveLogSubscriptionTest.kt b/platform/processor/live-view/src/test/kotlin/integration/LiveLogSubscriptionTest.kt index e393144ed..8cc75277a 100644 --- a/platform/processor/live-view/src/test/kotlin/integration/LiveLogSubscriptionTest.kt +++ b/platform/processor/live-view/src/test/kotlin/integration/LiveLogSubscriptionTest.kt @@ -29,6 +29,7 @@ import org.junit.jupiter.api.parallel.Isolated import spp.protocol.artifact.log.Log import spp.protocol.instrument.LiveLog import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView import spp.protocol.view.LiveViewConfig @@ -54,7 +55,7 @@ class LiveLogSubscriptionTest : LiveInstrumentIntegrationTest() { LiveSourceLocation( LiveLogSubscriptionTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, hitLimit = 5, @@ -65,7 +66,7 @@ class LiveLogSubscriptionTest : LiveInstrumentIntegrationTest() { LiveView( entityIds = mutableSetOf(liveLog.logFormat), viewConfig = LiveViewConfig("test", listOf("endpoint_logs")), - artifactLocation = LiveSourceLocation("", service = "spp-test-probe") + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! log.info("Using subscription id: {}", subscriptionId) diff --git a/platform/processor/live-view/src/test/kotlin/integration/LiveMeterPartitionTest.kt b/platform/processor/live-view/src/test/kotlin/integration/LiveMeterPartitionTest.kt index 33295bc3c..8263fca0c 100644 --- a/platform/processor/live-view/src/test/kotlin/integration/LiveMeterPartitionTest.kt +++ b/platform/processor/live-view/src/test/kotlin/integration/LiveMeterPartitionTest.kt @@ -25,9 +25,11 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Test +import org.junit.jupiter.api.parallel.Isolated import spp.protocol.instrument.LiveMeter import spp.protocol.instrument.location.LiveSourceLocation import spp.protocol.instrument.meter.* +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView import spp.protocol.view.LiveViewConfig @@ -35,6 +37,7 @@ import spp.protocol.view.LiveViewEvent import spp.protocol.view.rule.RulePartition import spp.protocol.view.rule.ViewRule +@Isolated class LiveMeterPartitionTest : LiveInstrumentIntegrationTest() { @Suppress("UNUSED_VARIABLE") @@ -62,7 +65,7 @@ class LiveMeterPartitionTest : LiveInstrumentIntegrationTest() { location = LiveSourceLocation( LiveMeterPartitionTest::class.java.name, getLineNumber("done"), - "spp-test-probe" + Service.fromName("spp-test-probe") ), id = testNameAsUniqueInstrumentId, applyImmediately = true @@ -88,7 +91,8 @@ class LiveMeterPartitionTest : LiveInstrumentIntegrationTest() { viewConfig = LiveViewConfig( "test", listOf("${liveMeter.id}_true", "${liveMeter.id}_false") - ) + ), + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! diff --git a/platform/processor/live-view/src/test/kotlin/integration/MetricsSerializationTest.kt b/platform/processor/live-view/src/test/kotlin/integration/MetricsSerializationTest.kt index 0ea745616..c33ed0e64 100644 --- a/platform/processor/live-view/src/test/kotlin/integration/MetricsSerializationTest.kt +++ b/platform/processor/live-view/src/test/kotlin/integration/MetricsSerializationTest.kt @@ -22,7 +22,7 @@ import io.vertx.kotlin.coroutines.await import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import spp.processor.view.impl.view.model.ClusterMetrics +import spp.processor.view.model.ClusterMetrics import java.util.* class MetricsSerializationTest : PlatformIntegrationTest() { diff --git a/platform/processor/live-view/src/test/kotlin/integration/RealtimeLiveViewTest.kt b/platform/processor/live-view/src/test/kotlin/integration/RealtimeLiveViewTest.kt index d9bd8ab6a..dd88ef72c 100644 --- a/platform/processor/live-view/src/test/kotlin/integration/RealtimeLiveViewTest.kt +++ b/platform/processor/live-view/src/test/kotlin/integration/RealtimeLiveViewTest.kt @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.parallel.Isolated import spp.protocol.artifact.metrics.MetricType -import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.platform.general.Service import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription import spp.protocol.view.LiveView import spp.protocol.view.LiveViewConfig @@ -38,8 +38,6 @@ class RealtimeLiveViewTest : PlatformIntegrationTest() { @Test fun `realtime instance_jvm_cpu`(): Unit = runBlocking { - viewService.clearLiveViews().await() - val subscriptionId = viewService.addLiveView( LiveView( entityIds = mutableSetOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId), @@ -47,7 +45,7 @@ class RealtimeLiveViewTest : PlatformIntegrationTest() { "test", listOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId) ), - artifactLocation = LiveSourceLocation("", service = "spp-test-probe") + service = Service.fromName("spp-test-probe") ) ).await().subscriptionId!! diff --git a/platform/processor/live-view/src/test/kotlin/integration/VCSHistoricalViewIT.kt b/platform/processor/live-view/src/test/kotlin/integration/VCSHistoricalViewIT.kt new file mode 100644 index 000000000..3065f5bed --- /dev/null +++ b/platform/processor/live-view/src/test/kotlin/integration/VCSHistoricalViewIT.kt @@ -0,0 +1,126 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package integration + +import io.vertx.core.json.JsonObject +import io.vertx.kotlin.coroutines.await +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.parallel.Isolated +import spp.probe.ProbeConfiguration +import spp.protocol.artifact.metrics.MetricStep +import spp.protocol.instrument.LiveMeter +import spp.protocol.instrument.location.LiveSourceLocation +import spp.protocol.instrument.meter.MeterType +import spp.protocol.instrument.meter.MetricValue +import spp.protocol.instrument.meter.MetricValueType +import spp.protocol.platform.general.Service +import spp.protocol.view.LiveView +import spp.protocol.view.LiveViewConfig +import spp.protocol.view.rule.ViewRule +import java.time.Instant +import java.time.temporal.ChronoUnit + +@Isolated +class VCSHistoricalViewIT : LiveInstrumentIntegrationTest() { + + private fun doTest() { + addLineLabel("done") { Throwable().stackTrace[0].lineNumber } + } + + @Test + fun `vcs historical view`(): Unit = runBlocking { + assumeTrue("true" == System.getProperty("test.includeSlow")) + setupLineLabels { + doTest() + } + + val liveMeter = LiveMeter( + MeterType.GAUGE, + MetricValue(MetricValueType.NUMBER, "2"), + location = LiveSourceLocation( + VCSHistoricalViewIT::class.java.name, + getLineNumber("done"), + Service.fromName("spp-test-probe") + ), + id = testNameAsUniqueInstrumentId, + applyImmediately = true + ) + + val rule = viewService.saveRule( + ViewRule( + name = liveMeter.id!!, + exp = buildString { + append("(") + append(liveMeter.id) + append(".downsampling(LATEST)") + append(").service(['service'], Layer.GENERAL)") + }, + meterIds = listOf(liveMeter.id!!) + ) + ).await() + + val subscriptionId = viewService.addLiveView( + LiveView( + entityIds = mutableSetOf(liveMeter.id!!), + viewConfig = LiveViewConfig("test", listOf(liveMeter.id!!)), + service = Service.fromName("spp-test-probe") + ) + ).await().subscriptionId!! + + instrumentService.addLiveInstrument(liveMeter).await() + doTest() + delay(75_000) + + //update commit id + val probeId = ProbeConfiguration.PROBE_ID + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test1") + ) + ).await() + delay(75_000) + + val stop = Instant.now().truncatedTo(ChronoUnit.MINUTES) + val start = stop.minusSeconds(5 * 60L) + val historicalView = viewService.getHistoricalMetrics( + listOf(Service.fromName("spp-test-probe").id), + listOf(liveMeter.id!!), + MetricStep.MINUTE, start, stop + ).await() + assertTrue(historicalView.data.map { it as JsonObject }.any { + val service = Service(it.getJsonObject("service")) + it.getString("value") == "2" && service.commitId == "test" + }) + assertTrue(historicalView.data.map { it as JsonObject }.any { + val service = Service(it.getJsonObject("service")) + it.getString("value") == "2" && service.commitId == "test1" + }) + + //clean up + assertNotNull(instrumentService.removeLiveInstrument(liveMeter.id!!).await()) + assertNotNull(viewService.removeLiveView(subscriptionId).await()) + assertNotNull(viewService.deleteRule(rule.name).await()) + } +} diff --git a/platform/processor/live-view/src/test/kotlin/integration/VCSLiveViewIT.kt b/platform/processor/live-view/src/test/kotlin/integration/VCSLiveViewIT.kt new file mode 100644 index 000000000..5f90ed9e1 --- /dev/null +++ b/platform/processor/live-view/src/test/kotlin/integration/VCSLiveViewIT.kt @@ -0,0 +1,173 @@ +/* + * Source++, the continuous feedback platform for developers. + * Copyright (C) 2022-2023 CodeBrig, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package integration + +import io.vertx.core.json.JsonObject +import io.vertx.junit5.VertxTestContext +import io.vertx.kotlin.coroutines.await +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.parallel.Isolated +import spp.probe.ProbeConfiguration +import spp.protocol.artifact.metrics.MetricType +import spp.protocol.platform.general.Service +import spp.protocol.service.SourceServices.Subscribe.toLiveViewSubscription +import spp.protocol.view.LiveView +import spp.protocol.view.LiveViewConfig +import spp.protocol.view.LiveViewEvent + +@Isolated +class VCSLiveViewIT : PlatformIntegrationTest() { + + @Test + fun `specific versioned realtime instance_jvm_cpu`(): Unit = runBlocking { + val subscriptionId = viewService.addLiveView( + LiveView( + entityIds = mutableSetOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId), + viewConfig = LiveViewConfig( + "test", + listOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId) + ), + service = Service.fromName("spp-test-probe").withCommitId("test1"), + ) + ).await().subscriptionId!! + + val probeId = ProbeConfiguration.PROBE_ID + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test1") + ) + ).await() + delay(2000) + var testContext = VertxTestContext() + verifyHit( + testContext, + subscriptionId, + Service.fromName("spp-test-probe").withCommitId("test1").id + ) + if (testContext.failed()) { + throw testContext.causeOfFailure() + } + + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test2") + ) + ).await() + delay(2000) + testContext = VertxTestContext() + verifyHit( + testContext, + subscriptionId, + Service.fromName("spp-test-probe").withCommitId("test2").id, + false + ) + if (testContext.failed()) { + throw testContext.causeOfFailure() + } + + //clean up + assertNotNull(viewService.removeLiveView(subscriptionId).await()) + } + + @Test + fun `versioned realtime instance_jvm_cpu`(): Unit = runBlocking { + val subscriptionId = viewService.addLiveView( + LiveView( + entityIds = mutableSetOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId), + viewConfig = LiveViewConfig( + "test", + listOf(MetricType.INSTANCE_JVM_CPU.asRealtime().metricId) + ), + service = Service.fromName("spp-test-probe") + ) + ).await().subscriptionId!! + + val probeId = ProbeConfiguration.PROBE_ID + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test1") + ) + ).await() + delay(2000) + var testContext = VertxTestContext() + verifyHit( + testContext, + subscriptionId, + Service.fromName("spp-test-probe").withCommitId("test1").id + ) + if (testContext.failed()) { + throw testContext.causeOfFailure() + } + + managementService.updateActiveProbeMetadata( + probeId, + JsonObject().put( + "application", + JsonObject().put("git_commit", "test2") + ) + ).await() + delay(2000) + testContext = VertxTestContext() + verifyHit( + testContext, + subscriptionId, + Service.fromName("spp-test-probe").withCommitId("test2").id + ) + if (testContext.failed()) { + throw testContext.causeOfFailure() + } + + //clean up + assertNotNull(viewService.removeLiveView(subscriptionId).await()) + } + + private suspend fun verifyHit( + testContext: VertxTestContext, + subscriptionId: String, + verifyServiceId: String, + errorOnTimeout: Boolean = true + ) { + val consumer = vertx.eventBus().consumer(toLiveViewSubscription(subscriptionId)) + consumer.handler { + val liveViewEvent = LiveViewEvent(it.body()) + val rawMetrics = JsonObject(liveViewEvent.metricsData) + log.info("Received metrics: $rawMetrics") + + testContext.verify { + assertEquals(verifyServiceId, rawMetrics.getString("serviceId")) + } + testContext.completeNow() + } + if (errorOnTimeout) { + errorOnTimeout(testContext) + } else { + successOnTimeout(testContext) + } + consumer.unregister().await() + } +} diff --git a/platform/processor/live-view/src/test/kotlin/spp/processor/view/impl/view/model/ClusterMetricsTest.kt b/platform/processor/live-view/src/test/kotlin/spp/processor/view/model/ClusterMetricsTest.kt similarity index 98% rename from platform/processor/live-view/src/test/kotlin/spp/processor/view/impl/view/model/ClusterMetricsTest.kt rename to platform/processor/live-view/src/test/kotlin/spp/processor/view/model/ClusterMetricsTest.kt index 0429ebb56..0997f5b1f 100644 --- a/platform/processor/live-view/src/test/kotlin/spp/processor/view/impl/view/model/ClusterMetricsTest.kt +++ b/platform/processor/live-view/src/test/kotlin/spp/processor/view/model/ClusterMetricsTest.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package spp.processor.view.impl.view.model +package spp.processor.view.model import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue diff --git a/platform/processor/live-view/src/test/resources/spp-test-probe.yml b/platform/processor/live-view/src/test/resources/spp-test-probe.yml index c281edd85..d64f98fed 100644 --- a/platform/processor/live-view/src/test/resources/spp-test-probe.yml +++ b/platform/processor/live-view/src/test/resources/spp-test-probe.yml @@ -4,6 +4,11 @@ spp: client_secret: "test-secret" platform_host: "localhost" platform_port: 12800 + application: + environment: ${SPP_APPLICATION_ENVIRONMENT:-} + version: ${SPP_APPLICATION_VERSION:-} + git_commit: ${SPP_APPLICATION_GIT_COMMIT:-} + skywalking: logging: level: "DEBUG" diff --git a/probes/jvm b/probes/jvm index deab9f234..fdb4d7375 160000 --- a/probes/jvm +++ b/probes/jvm @@ -1 +1 @@ -Subproject commit deab9f234a1c96bd678641cf7960eac6c839b33e +Subproject commit fdb4d7375a79ac58f001c3a71c0a622590223dde diff --git a/probes/nodejs b/probes/nodejs index 9c75f23c2..6570686b2 160000 --- a/probes/nodejs +++ b/probes/nodejs @@ -1 +1 @@ -Subproject commit 9c75f23c2ee4e69561cf6ede93a6f0e6661321f6 +Subproject commit 6570686b2bfe26f5afe9fd2d29835e96b0883e6f diff --git a/probes/python b/probes/python index 1879c2fa2..f65899c19 160000 --- a/probes/python +++ b/probes/python @@ -1 +1 @@ -Subproject commit 1879c2fa2c0c054f678ebc65f9f6f0ecea1ac244 +Subproject commit f65899c19775b851c214e33e7557289300de1465 diff --git a/protocol b/protocol index 89c115bf8..68a6a9298 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 89c115bf84cf9ce000213bc7639fa75862be78f3 +Subproject commit 68a6a9298dca71f127af6d694b92d155b7beb14c