Skip to content

[Feat]: RTSP source #217

@perotom

Description

@perotom

Version

3.0.0-RC

Environment that reproduces the issue

Emulator

Use case description

I want to restream a local RTSP stream (CCTV camera) via SRT to the cloud to make it more reliable.

Proposed solution

I read the advanced docs (https://github.com/ThibaultBee/StreamPack/blob/main/docs/AdvancedStreamer.md) and combined it with https://github.com/alexeyvasilyev/rtsp-client-android to get low latency stream. I want to display the stream and forward it via SRT.

So far I have built a custom class for this:

import android.net.Uri
import android.view.Surface
import com.alexvas.rtsp.codec.VideoDecodeThread.DecoderType
import com.alexvas.rtsp.codec.VideoDecoderSurfaceThread
import com.alexvas.rtsp.widget.RtspProcessor
import io.github.thibaultbee.streampack.core.elements.processing.video.source.DefaultSourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider
import io.github.thibaultbee.streampack.core.elements.sources.video.ISurfaceSourceInternal
import io.github.thibaultbee.streampack.core.elements.sources.video.IVideoSourceInternal
import io.github.thibaultbee.streampack.core.elements.sources.video.VideoSourceConfig
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.withContext

/**
 *  Minimal head-less RTSP client that decodes *video-only* into the encoder Surface.
 *
 *  Scope: H.264/H.265 over TCP, no audio, low-latency.
 */
class RtspClientSurfaceSource(
    val rtspUri: Uri,
    private val username: String? = null,
    private val password: String? = null,
    private val userAgent : String? = "StreamPack-RtspClient"
) : IVideoSourceInternal, ISurfaceSourceInternal {

    /* ---------- required StreamPack flows ---------- */
    private val _infoProviderFlow    = MutableStateFlow<ISourceInfoProvider>(DefaultSourceInfoProvider())
    override val infoProviderFlow: StateFlow<ISourceInfoProvider> = _infoProviderFlow

    private val _isStreaming         = MutableStateFlow(false)
    override val isStreamingFlow: StateFlow<Boolean>              = _isStreaming

    override val timestampOffsetInNs = 0L

    /* ---------- rtsp-client objects ---------- */
    private var encoderSurface : Surface?        = null
    private var rtspProcessor  : RtspProcessor?  = null

    /* ---------------------------------------------------------------------- */
    override suspend fun configure(config: VideoSourceConfig) {
        /* no-op – we create the processor lazily in startStream() once the
           encoder Surface has been provided via setOutput().                */
    }

    override suspend fun setOutput(surface: Surface) { encoderSurface = surface }

    override suspend fun getOutput(): Surface?     = encoderSurface
    override suspend fun resetOutput()             { encoderSurface = null }

    override suspend fun startStream() {
        check(! _isStreaming.value)          { "already started" }
        val surface = encoderSurface ?: error("setOutput() must be called first")

        // All rtsp-client operations must run on the main thread
        withContext(Dispatchers.Main.immediate) {
            rtspProcessor = RtspProcessor(
                onVideoDecoderCreateRequested = { mime, rot, queue, listener, _ ->
                    VideoDecoderSurfaceThread(
                        surface, mime,
                        /* dummy size – codec derives from SPS */ 1920, 1080,
                        rot, queue, listener, DecoderType.HARDWARE
                    )
                }
            ).apply {
                // absolutely *video only* for this source
                init(rtspUri, username, password, userAgent)
                start(requestVideo = true, requestAudio = false)
            }
        }
        _isStreaming.value = true
    }

    override suspend fun stopStream() {
        rtspProcessor?.stop()
        _isStreaming.value = false
    }

    override fun release() {
        rtspProcessor?.stop()
        rtspProcessor = null
        encoderSurface = null
    }
}

The error I get from this is:

/home/runner/work/srtdroid/srtdroid/srtdroid-core/.cxx/RelWithDebInfo/6h4y1g40/arm64-v8a/srt_project-prefix/src/srt_project/srtcore/core.cpp@6868:sendmsg2 08:21:54.643737/DefaultDispatch*E:SRT.as: @854765524: Wrong source time was provided. Sending is rejected.


onStreamError: java.net.SocketException: Operation not supported: Incorrect use of Message API
	io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ClosedException: java.net.SocketException: Operation not supported: Incorrect use of Message API (sendmsg/recvmsg)
		at io.github.thibaultbee.streampack.ext.srt.internal.endpoints.composites.sinks.SrtSink.write(SrtSink.kt:140)
		at io.github.thibaultbee.streampack.ext.srt.internal.endpoints.composites.sinks.SrtSink$write$1.invokeSuspend(Unknown Source:15)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:98)
		at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
		at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
		at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
		at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source:1)
		at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:47)
		at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source:1)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint$1.onOutputFrame(CompositeEndpoint.kt:56)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.utils.TSOutputCallback.writePacket(TSOutputCallback.kt:24)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets.TS.write(TS.kt:133)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.packets.Pes.write(Pes.kt:51)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer.generateStreams(TsMuxer.kt:161)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer.write(TsMuxer.kt:148)
		at io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint.write(CompositeEndpoint.kt:77)
		at io.github.thibaultbee.streampack.core.elements.endpoints.DynamicEndpoint.write$suspendImpl(DynamicEndpoint.kt:101)
		at io.github.thibaultbee.streampack.core.elements.endpoints.DynamicEndpoint.write(Unknown Source:0)
		at io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput$videoEncoderListener$1$onOutputFrame$1$1.invokeSuspend(EncodingPipelineOutput.kt:244)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
		at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
		at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
		at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
		at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source:1)
		at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:47)
		at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source:1)
		at io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.EncodingPipelineOutput$videoEncoderListener$1.onOutputFrame(EncodingPipelineOutput.kt:243)
		at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder.processOutputFrameSync$lambda$9(MediaCodecEncoder.kt:371)
		at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder.$r8$lambda$JfRWavPzZuJnPtH_AZ7qsi0CZp8(Unknown Source:0)
		at io.github.thibaultbee.streampack.core.elements.encoders.mediacodec.MediaCodecEncoder$$ExternalSyntheticLambda5.run(D8$$SyntheticClass:0)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
		at java.lang.Thread.run(Thread.java:1012)
Caused by: java.net.SocketException: Operation not supported: Incorrect use of Message API (sendmsg/recvmsg)
	at io.github.thibaultbee.srtdroid.core.models.SrtSocket.send(SrtSocket.kt:684)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.send$lambda$4(CoroutineSrtSocket.kt:389)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.$r8$lambda$R-v6WsWB6ftIkiuK0jEZhbCNRms(Unknown Source:0)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket$$ExternalSyntheticLambda3.invoke(D8$$SyntheticClass:0)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.executeEpoll(CoroutineSrtSocket.kt:866)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.executeEpollWithTimeout(CoroutineSrtSocket.kt:819)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket.access$executeEpollWithTimeout(CoroutineSrtSocket.kt:48)
	at io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket$execute$3.invokeSuspend(CoroutineSrtSocket.kt:797)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)

Any ideas what to do?

Alternative solutions

ChatGPT told me the following solutions, although they are incorrect. A. is a hallucination. Not sure about B. and if this is really the root cause.

Path What it does How to apply
A. Turn off the Message-API StreamPack will call the classic send() instead of sendmsg2; no timestamps are sent, so SRT can’t complain. Create the SrtMediaDescriptor with messageApi = false (newer StreamPack versions default to true).
B. Keep Message-API but send valid timestamps You’d patch StreamPack to set srcTime = SRT.timeNow() for every packet. Requires editing the library (not recommended just to unblock you).

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions