Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class ConnectionPool<T : ConcreteConnection>(

fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> = objectPool.giveBack(item)

fun softEvictConnections(): CompletableFuture<AsyncObjectPool<T>> = objectPool.softEvict()

/**
*
* Closes the pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ internal constructor(
return future
}

override fun softEvict(): CompletableFuture<AsyncObjectPool<T>> {
val future = CompletableFuture<Unit>()
val offered = actor.trySend(SoftEvictAll(future)).isSuccess
if (!offered) {
future.completeExceptionally(Exception("could not offer to actor"))
}
return future.map { this }
}

override fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> {
val future = CompletableFuture<Unit>()
val offered = actor.trySend(GiveBack(item, future)).isSuccess
Expand Down Expand Up @@ -166,6 +175,7 @@ private sealed class ActorObjectPoolMessage<T : PooledObject> {
}

private class Take<T : PooledObject>(val future: CompletableFuture<T>) : ActorObjectPoolMessage<T>()
private class SoftEvictAll<T : PooledObject>(val future: CompletableFuture<Unit>) : ActorObjectPoolMessage<T>()
private class GiveBack<T : PooledObject>(
val returnedItem: T,
val future: CompletableFuture<Unit>,
Expand All @@ -183,7 +193,8 @@ private class GiveBack<T : PooledObject>(
private class Created<T : PooledObject>(
val itemCreateId: Int,
val item: Try<T>,
val takeAskFuture: CompletableFuture<T>?
val takeAskFuture: CompletableFuture<T>?,
val objectHolder: ObjectHolder<CompletableFuture<out T>>
) : ActorObjectPoolMessage<T>() {
override fun toString(): String {
val id = when (item) {
Expand Down Expand Up @@ -227,6 +238,7 @@ private class ObjectPoolActor<T : PooledObject>(
when (message) {
is Take<T> -> handleTake(message)
is GiveBack<T> -> handleGiveBack(message)
is SoftEvictAll<T> -> handleSoftEvictAll(message)
is Created<T> -> handleCreated(message)
is TestPoolItems<T> -> handleTestPoolItems()
is Close<T> -> handleClose(message)
Expand All @@ -235,6 +247,19 @@ private class ObjectPoolActor<T : PooledObject>(
scheduleNewItemsIfNeeded()
}

private fun handleSoftEvictAll(message: SoftEvictAll<T>) {
evictAvailableItems()
inUseItems.values.forEach { it.markForEviction = true }
inCreateItems.entries.forEach { it.value.markForEviction = true }
logger.trace { "handleSoftEvictAll - done" }
message.future.complete(Unit)
}

private fun evictAvailableItems() {
availableItems.forEach { it.item.destroy() }
availableItems.clear()
}

private fun scheduleNewItemsIfNeeded() {
logger.trace { "scheduleNewItemsIfNeeded - $poolStatusString" }
// deal with inconsistency in case we have items but also waiting futures
Expand All @@ -247,7 +272,7 @@ private class ObjectPoolActor<T : PooledObject>(
return
}
}
// deal with inconsistency in case we have waiting futures, and but we can create new items for them
// deal with inconsistency in case we have waiting futures, but we can create new items for them
while (availableItems.isEmpty() &&
waitingQueue.isNotEmpty() &&
totalItems < configuration.maxObjects &&
Expand All @@ -273,8 +298,7 @@ private class ObjectPoolActor<T : PooledObject>(
try {
closed = true
channel.close()
availableItems.forEach { it.item.destroy() }
availableItems.clear()
evictAvailableItems()
inUseItems.forEach {
it.value.cleanedByPool = true
it.key.destroy()
Expand Down Expand Up @@ -368,10 +392,12 @@ private class ObjectPoolActor<T : PooledObject>(
logger.trace { "releasing idle item ${item.id}" }
item.destroy()
}

configuration.maxObjectTtl != null && System.currentTimeMillis() - item.creationTime > configuration.maxObjectTtl -> {
logger.trace { "releasing item past ttl ${item.id}" }
item.destroy()
}

else -> {
val test = objectFactory.test(item)
inUseItems[item] = ItemInUseHolder(item.id, isInTest = true, testFuture = test)
Expand Down Expand Up @@ -411,7 +437,7 @@ private class ObjectPoolActor<T : PooledObject>(
is Failure -> future.completeExceptionally(message.item.exception)
is Success -> {
try {
message.item.value.borrowTo(future)
message.item.value.borrowTo(future, markForEviction = message.objectHolder.markForEviction)
} catch (e: Exception) {
future.completeExceptionally(e)
}
Expand All @@ -420,11 +446,11 @@ private class ObjectPoolActor<T : PooledObject>(
}
}

private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true) {
private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true, markForEviction: Boolean = false,) {
if (validate) {
validate(this)
}
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false)
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false, markForEviction = markForEviction)
logger.trace { "borrowed: ${this.id} ; $poolStatusString" }
future.complete(this)
}
Expand All @@ -450,6 +476,11 @@ private class ObjectPoolActor<T : PooledObject>(
}
validate(message.returnedItem)
message.future.complete(Unit)
if (removed.markForEviction) {
logger.trace { "GiveBack got item ${message.returnedItem.id} marked for eviction, so destroying it" }
message.returnedItem.destroy()
return
}
if (waitingQueue.isEmpty()) {
if (availableItems.any { holder -> message.returnedItem === holder.item }) {
logger.warn { "trying to give back an item to the pool twice ${message.returnedItem.id}, will ignore that" }
Expand Down Expand Up @@ -533,10 +564,11 @@ private class ObjectPoolActor<T : PooledObject>(
val created = objectFactory.create()
val itemCreateId = createIndex
createIndex++
inCreateItems[itemCreateId] = ObjectHolder(created)
val objectHolder = ObjectHolder(created)
inCreateItems[itemCreateId] = objectHolder
logger.trace { "createObject createRequest=$itemCreateId" }
created.onComplete { tried ->
offerOrLog(Created(itemCreateId, tried, future)) {
offerOrLog(Created(itemCreateId, tried, future, objectHolder)) {
"failed to offer on created item $itemCreateId"
}
}
Expand All @@ -558,9 +590,11 @@ private open class PoolObjectHolder<T : PooledObject>(
val timeElapsed: Long get() = System.currentTimeMillis() - time
}

private class ObjectHolder<T : Any>(val item: T) {
private class ObjectHolder<T : Any>(
val item: T,
var markForEviction: Boolean = false,
) {
val time = System.currentTimeMillis()

val timeElapsed: Long get() = System.currentTimeMillis() - time
}

Expand All @@ -569,12 +603,13 @@ private data class ItemInUseHolder<T : PooledObject>(
val isInTest: Boolean,
val testFuture: CompletableFuture<T>? = null,
val time: Long = System.currentTimeMillis(),
var cleanedByPool: Boolean = false
var cleanedByPool: Boolean = false,
var markForEviction: Boolean = false,

) {
val timeElapsed: Long get() = System.currentTimeMillis() - time

@Suppress("unused", "ProtectedInFinal")
@Suppress("unused")
protected fun finalize() {
if (!cleanedByPool) {
logger.warn { "LEAK DETECTED for item $this - $timeElapsed ms since in use" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ interface AsyncObjectPool<T> {

fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>>

/**
* Mark all objects in the pool as invalid. Objects will be evicted when not in use.
*/
fun softEvict(): CompletableFuture<AsyncObjectPool<T>>

/**
*
* Closes this pool and future calls to **take** will cause the Future to raise an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ class ActorBasedObjectPoolTest {
assertThat(factory.validated).isEqualTo(listOf(result, result, result))
}

@Test
fun `softEviction - basic take-evict-return pool should be empty`() {
tested = createDefaultPool()
val result = tested.take().get()
tested.softEvict().get()
tested.giveBack(result).get()
assertThat(tested.availableItems).isEmpty()
}

@Test
fun `softEviction - minimum number of objects is maintained, but objects are replaced`() {
tested = ActorBasedObjectPool(
factory,
configuration.copy(minIdleObjects = 3),
false
)
tested.take().get()
await.untilCallTo { tested.availableItemsSize } matches { it == 3 }
val availableItems = tested.availableItems
tested.softEvict().get()
await.untilCallTo { tested.availableItemsSize } matches { it == 3 }
assertThat(tested.availableItems.toSet().intersect(availableItems.toSet())).isEmpty()
}

@Test
fun `test for objects in create eviction in case of softEviction`() {
factory.creationStuckTime = 10
tested = createDefaultPool()
val itemPromise = tested.take()
tested.softEvict().get()
val item = itemPromise.get()
tested.giveBack(item).get()
assertThat(tested.availableItemsSize).isEqualTo(0)
}

@Test
fun `take2-return2-take first not validated second is ok should be returned`() {
tested = createDefaultPool()
Expand Down