Jakub Kubík
05/03/2023, 11:07 AMSender
.
We implement a custom buffering wrapper over it and it certain situations, for a certain amount of rows, we write them and then .flush()
after each one. We are familiar with the performance ramifications of this.
The expected result is that each of the rows gets written to the target table, albeit a bit slower than usual.
What actually happens is that only a couple rows from ~1000 of them get written, without any exception thrown.
Has anyone here come across similar behaviour?Andrey Pechkurov
05/03/2023, 11:20 AME
or C
log messages).Jakub Kubík
05/03/2023, 11:32 AMBufferedSender
, which takes care of reliably writing data (of possibly wrong format) to QuestDB.
If it comes across a malformed row, it should still write all the good rows in the buffer and continue as usual.
class BufferedSender(private val env: QuestDBEnvironment) : ILoggable {
data class BufferedTimeSeriesPoint(
val tableName: String,
val point: TimeSeriesPoint,
val at: Instant
)
private val buffer = mutableListOf<BufferedTimeSeriesPoint>()
private val maxBufferSize = env.bufferSize
private var sender: Sender = buildSender()
companion object : ILoggable {
private val logger = logger()
}
private fun Instant.toEpochMicro(): Long {
return ChronoUnit.MICROS.between(Instant.EPOCH, this)
}
private fun Instant.toEpochNano(): Long {
return ChronoUnit.NANOS.between(Instant.EPOCH, this)
}
private fun buildSender() = Sender
.builder()
.address("${env.host}:${env.writePort}")
.bufferCapacity(Int.MAX_VALUE)
.build()
private fun writePoint(bufferedPoint: BufferedTimeSeriesPoint) {
val row = sender.table(bufferedPoint.tableName)
val point = bufferedPoint.point
point.tags.forEach { row.symbol(it.key, it.value) }
point.booleanFields.forEach { row.boolColumn(it.key, it.value) }
point.longFields.forEach { row.longColumn(it.key, it.value) }
point.decimalFields.forEach { row.doubleColumn(it.key, it.value.toDouble()) }
point.stringFields.forEach { row.stringColumn(it.key, it.value) }
point.datetimeFields.forEach { row.timestampColumn(it.key, it.value.toEpochMicro()) }
<http://row.at|row.at>(bufferedPoint.at.toEpochNano())
}
private fun flushIfNecessary() {
if (buffer.size < maxBufferSize)
return
sender.flush()
buffer.clear()
}
private fun recover() {
reconnect()
buffer.forEach {
try {
writePoint(it)
sender.flush()
} catch (e: LineSenderException) {
logger.error("Unable to send row: ${it.point}", e)
reconnect()
}
}
buffer.clear()
}
private fun reconnect() {
try {
sender.close()
} catch (_: LineSenderException) {}
sender = buildSender()
}
fun write(tableName: String, point: TimeSeriesPoint, now: Instant) {
val bufferedPoint = BufferedTimeSeriesPoint(
tableName,
point,
now
)
try {
buffer.add(bufferedPoint)
writePoint(bufferedPoint)
flushIfNecessary()
} catch (e: LineSenderException) {
logger.error("An exception was caught while inserting a point into QuestDB: $point", e)
recover()
}
}
fun close() {
sender.close()
}
}
recover()
method, which (as I previously stated) should send the valid rows, but sends only a small amount of them and rest are lost.Andrey Pechkurov
05/03/2023, 12:52 PMJaromir Hamala
05/03/2023, 1:20 PMsender.flush()
.
• If the flush()
call is successful then you treat the points as successful written and you clear the buffer.
• If flush()
throws an exception then you start iterating over the local buffer and call flush()
after each while skipping points which throw an exception on flush()
I’m afraid this whole idea is based on a wrong assumption: You are assuming that if you write a bad line/point and call flush()
then the flush call will throw an exception. That’s actually not the case. Flush can succeed even when a line cannot be accepted by a server (a wrong schema, etc.). In theory it can succeed even when a TCP connection is broken (while the client TCP/IP stack does not know it yet).
For further troubleshooting I would closely insepct QuestDB server logs. If that does not know anything suspicious then I would consult your local Kotlin expert (=totally not me:)) and as a last resort I would fire up Wireshark to see what exactly is going. The ILP protocol is text-based so even without a custom dissector it’s easy to see what is going on.
A silly idea: Are you sure you call this from a single thread only?Jakub Kubík
05/03/2023, 1:27 PMI’m afraid this whole idea is based on a wrong assumption: You are assuming that if you write a bad line/point and callOkay, so is there a mechanism to determine whether a row is malformed and hence will not be accepted by QuestDB?then the flush call will throw an exception. That’s actually not the case. Flush can succeed even when a line cannot be accepted by a server (a wrong schema, etc.). In theory it can succeed even when a TCP connection is broken (while the client TCP/IP stack does not know it yet).flush()
For further troubleshooting I would closely insepct QuestDB server logs. If that does not know anything suspicious then I would consult your local Kotlin expert (=totally not me:)) and as a last resort I would fire up Wireshark to see what exactly is going. The ILP protocol is text-based so even without a custom dissector it’s easy to see what is going on.I will try and see what's happening using Wireshark, thanks.
Are you sure you call this from a single thread only?Yes, I am.
Jaromir Hamala
05/03/2023, 1:36 PMOkay, so is there a mechanism to determine whether a row is malformed and hence will not be accepted by QuestDB?The ILP client does static checks: It validates table and row names do not have any illegal characters, it checks max. length, etc. This is happening when a column/table is created created, ie. no need to call flush for this. But the client does not do any logical validation such as schema validation: Imagine you have a table with an int column and you try to insert a string. a client has no way to check this. so server will close a connection when processing such row. this will be eventually propagated into the client where it manifests as a LineSenderException. We have plans to add more fine-grained feedback to ILP clients, but that’s something for future. Basically, when
flush()
succeeds it means only this: Your OS TCP/IP stack believes the TCP connection is still alive and there is enough space in the send buffer. Nothing more, nothing less.Jakub Kubík
05/03/2023, 1:52 PMJaromir Hamala
05/03/2023, 2:51 PM