-
-
Notifications
You must be signed in to change notification settings - Fork 91
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Version
3.0.0-RC
Environment that reproduces the issue
Emulator
Use case description
I want to restream a local RTSP stream (CCTV camera) via SRT to the cloud to make it more reliable.
Proposed solution
I read the advanced docs (https://github.com/ThibaultBee/StreamPack/blob/main/docs/AdvancedStreamer.md) and combined it with https://github.com/alexeyvasilyev/rtsp-client-android to get low latency stream. I want to display the stream and forward it via SRT.
So far I have built a custom class for this:
import android.net.Uri
import android.view.Surface
import com.alexvas.rtsp.codec.VideoDecodeThread.DecoderType
import com.alexvas.rtsp.codec.VideoDecoderSurfaceThread
import com.alexvas.rtsp.widget.RtspProcessor
import io.github.thibaultbee.streampack.core.elements.processing.video.source.DefaultSourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.sources.video.ISurfaceSourceInternal
import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal
import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.withContext
/**
* Minimal head-less RTSP client that decodes *video-only* into the encoder Surface.
*
* Scope: H.264/H.265 over TCP, no audio, low-latency.
*/
class RtspClientSurfaceSource(
val rtspUri: Uri,
private val username: String? = null,
private val password: String? = null,
private val userAgent : String? = "StreamPack-RtspClient"
) : IVideoSourceInternal, ISurfaceSourceInternal {
/* ---------- required StreamPack flows ---------- */
private val _infoProviderFlow = MutableStateFlow<ISourceInfoProvider>(DefaultSourceInfoProvider())
override val infoProviderFlow: StateFlow<ISourceInfoProvider> = _infoProviderFlow
private val _isStreaming = MutableStateFlow(false)
override val isStreamingFlow: StateFlow<Boolean> = _isStreaming
override val timestampOffsetInNs = 0L
/* ---------- rtsp-client objects ---------- */
private var encoderSurface : Surface? = null
private var rtspProcessor : RtspProcessor? = null
/* ---------------------------------------------------------------------- */
override suspend fun configure(config: VideoSourceConfig) {
/* no-op – we create the processor lazily in startStream() once the
encoder Surface has been provided via setOutput(). */
}
override suspend fun setOutput(surface: Surface) { encoderSurface = surface }
override suspend fun getOutput(): Surface? = encoderSurface
override suspend fun resetOutput() { encoderSurface = null }
override suspend fun startStream() {
check(! _isStreaming.value) { "already started" }
val surface = encoderSurface ?: error("setOutput() must be called first")
// All rtsp-client operations must run on the main thread
withContext(Dispatchers.Main.immediate) {
rtspProcessor = RtspProcessor(
onVideoDecoderCreateRequested = { mime, rot, queue, listener, _ ->
VideoDecoderSurfaceThread(
surface, mime,
/* dummy size – codec derives from SPS */ 1920, 1080,
rot, queue, listener, DecoderType.HARDWARE
)
}
).apply {
// absolutely *video only* for this source
init(rtspUri, username, password, userAgent)
start(requestVideo = true, requestAudio = false)
}
}
_isStreaming.value = true
}
override suspend fun stopStream() {
rtspProcessor?.stop()
_isStreaming.value = false
}
override fun release() {
rtspProcessor?.stop()
rtspProcessor = null
encoderSurface = null
}
}The error I get from this is:
/home/runner/work/srtdroid/srtdroid/srtdroid-core/.cxx/RelWithDebInfo/6h4y1g40/arm64-v8a/srt_project-prefix/src/srt_project/srtcore/core.cpp@6868:sendmsg2 08:21:54.643737/DefaultDispatch*E:SRT.as: @854765524: Wrong source time was provided. Sending is rejected.
onStreamError: java.net.SocketException: Operation not supported: Incorrect use of Message API
io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ClosedException: java.net.SocketException: Operation not supported: Incorrect use of Message API (sendmsg/recvmsg)
at io.github.thibaultbee.streampack.ext.srt.internal.endpoints.composites.sinks.SrtSink.write(SrtSink.kt:140)
at io.github.thibaultbee.streampack.ext.srt.internal.endpoints.composites.sinks.SrtSink$write$1.invokeSuspend(Unknown Source:15)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:98)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source:1)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:47)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source:1)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint$1.onOutputFrame(CompositeEndpoint.kt:56)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.TSOutputCallback.writePacket(TSOutputCallback.kt:24)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets.TS.write(TS.kt:133)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets.Pes.write(Pes.kt:51)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer.generateStreams(TsMuxer.kt:161)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer.write(TsMuxer.kt:148)
at io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint.write(CompositeEndpoint.kt:77)
at io.github.thibaultbee.streampack.core.elements.endpoints.DynamicEndpoint.write$suspendImpl(DynamicEndpoint.kt:101)
at io.github.thibaultbee.streampack.core.elements.endpoints.DynamicEndpoint.write(Unknown Source:0)
at io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput$videoEncoderListener$1$onOutputFrame$1$1.invokeSuspend(EncodingPipelineOutput.kt:244)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source:1)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:47)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source:1)
at io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput$videoEncoderListener$1.onOutputFrame(EncodingPipelineOutput.kt:243)
at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder.processOutputFrameSync$lambda$9(MediaCodecEncoder.kt:371)
at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder.$r8$lambda$JfRWavPzZuJnPtH_AZ7qsi0CZp8(Unknown Source:0)
at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder$$ExternalSyntheticLambda5.run(D8$$SyntheticClass:0)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
at java.lang.Thread.run(Thread.java:1012)
Caused by: java.net.SocketException: Operation not supported: Incorrect use of Message API (sendmsg/recvmsg)
at io.github.thibaultbee.srtdroid.core.models.SrtSocket.send(SrtSocket.kt:684)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.send$lambda$4(CoroutineSrtSocket.kt:389)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.$r8$lambda$R-v6WsWB6ftIkiuK0jEZhbCNRms(Unknown Source:0)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket$$ExternalSyntheticLambda3.invoke(D8$$SyntheticClass:0)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.executeEpoll(CoroutineSrtSocket.kt:866)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.executeEpollWithTimeout(CoroutineSrtSocket.kt:819)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.access$executeEpollWithTimeout(CoroutineSrtSocket.kt:48)
at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket$execute$3.invokeSuspend(CoroutineSrtSocket.kt:797)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
Any ideas what to do?
Alternative solutions
ChatGPT told me the following solutions, although they are incorrect. A. is a hallucination. Not sure about B. and if this is really the root cause.
| Path | What it does | How to apply |
|---|---|---|
| A. Turn off the Message-API | StreamPack will call the classic send() instead of sendmsg2; no timestamps are sent, so SRT can’t complain. | Create the SrtMediaDescriptor with messageApi = false (newer StreamPack versions default to true). |
| B. Keep Message-API but send valid timestamps | You’d patch StreamPack to set srcTime = SRT.timeNow() for every packet. | Requires editing the library (not recommended just to unblock you). |
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request