Title
f

Furkan Küçük

12/09/2022, 9:53 PM
Hello all, one more question: I am getting an error while using Kafka-Connect to get messages from Redpanda at remote. Here is the configuration:
{
  "name": "questdb-sink",
  "config": {
    "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
    "topics": "book,candles,ticker,trades",
    "host": "crypto_questdb",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}
With this, Kafka connect actually connects, gets only one message from Kafka and throws the exception below:
[2022-12-09 21:43:47,413] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: [104] send error (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [104] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,421] WARN WorkerSinkTask{id=questdb-sink-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2022-12-09 21:43:47,421] ERROR WorkerSinkTask{id=questdb-sink-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.kafka.QuestDBSinkTask.flush(QuestDBSinkTask.java:284)
	at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:646)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closeAllPartitions(WorkerSinkTask.java:641)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,422] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [104] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

	... 10 more
[2022-12-09 21:43:47,423] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.cutlass.line.AbstractLineSender.close(AbstractLineSender.java:111)
	at io.questdb.kafka.QuestDBSinkTask.stop(QuestDBSinkTask.java:289)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:167)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,426] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Revoke previously assigned partitions book-0, candles-0, ticker-0, trades-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-12-09 21:43:47,427] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Member connector-consumer-questdb-sink-0-40875dda-9812-476c-8617-52a20314b30f sending LeaveGroup request to coordinator SOME_IP:SOME_PORT (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
j

Jaromir Hamala

12/10/2022, 4:19 PM
Hello, what is in the QuestDB server log?
f

Furkan Küçük

12/12/2022, 5:53 AM
Hello @Jaromir Hamala, I was trying to reproduce the error. Sorry for the late response. Here is the QuestDB log:
2022-12-12T05:51:48.681936Z I i.q.c.l.t.TableUpdateDetails closing table writer [tableName=candles]
2022-12-12T05:51:48.681945Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.681981Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.682002Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.682095Z I i.q.c.p.WriterPool >> [table=`candles`, thread=17]
2022-12-12T05:51:48.682373Z I i.q.c.l.t.LineTcpMeasurementScheduler assigned candles to thread 0
2022-12-12T05:51:48.682410Z I i.q.c.l.t.TableUpdateDetails network IO thread using table [workerId=2, tableName=candles, nNetworkIoWorkers=1]
2022-12-12T05:51:48.683024Z E i.q.c.l.t.LineTcpConnectionContext [22] could not process line data [table=candles, msg=cast error for line protocol float [columnWriterIndex=4, columnType=LONG, name=volume], errno=0]
2022-12-12T05:51:48.683189Z E i.q.c.l.t.LineTcpConnectionContext [22] could not parse measurement, NONE at 241, line (may be mangled due to partial parsing): 'candles symbol="BNB-USDT",receipt_timestamp=1.6708236602694917E9,start=1670823600i,trades=54i,volume=60.932,high=281.1,stop=1.670823659999E9,low=281i,closed=t,exchange="BINANCE",interval="1m",close=281.1,open=281.1,timestamp=1.670823660003E9'
2022-12-12T05:51:48.683226Z I tcp-line-server scheduling disconnect [fd=22, reason=0]
2022-12-12T05:51:48.683344Z I tcp-line-server disconnected [ip=172.27.0.3, fd=22, src=queue]
2022-12-12T05:51:48.683558Z I i.q.c.l.t.LineTcpWriterJob assigned table to writer thread [tableName=candles, threadId=0]
2022-12-12T05:51:49.559702Z I i.q.c.l.t.TableUpdateDetails network IO thread released table [workerId=2, tableName=candles, nNetworkIoWorkers=0]
2022-12-12T05:51:49.559734Z I i.q.c.l.t.LineTcpMeasurementScheduler active table going idle [tableName=candles]
2022-12-12T05:51:49.566590Z I i.q.c.l.t.LineTcpMeasurementScheduler releasing writer, its been idle since 2022-12-12T05:51:48.682000Z[tableName=candles]
2022-12-12T05:51:49.566666Z I i.q.c.p.WriterPool << [table=`candles`, thread=18]
j

Jaromir Hamala

12/12/2022, 8:28 AM
hello @Furkan Küçük, did you create the
candles
table via SQL? it looks like the
volume
column was created as LONG, but in Kafka it’s a floating point number. (float or double)
f

Furkan Küçük

12/12/2022, 8:28 AM
@Jaromir Hamala No, the table is being created by the QuestDB's Kafka Connect plugin.
j

Jaromir Hamala

12/12/2022, 8:31 AM
interesting. how are the messages in Kafka created?
I assume your message do not contain any schema, right? is it JSON?
f

Furkan Küçük

12/12/2022, 8:32 AM
@Jaromir Hamala Here is the last message from the same kafka topic:
{
  "exchange": "BINANCE_FUTURES",
  "symbol": "ETH-USDT-22Z30",
  "start": 1670833800,
  "stop": 1670833859.999,
  "interval": "1m",
  "trades": 70,
  "open": 1245.94,
  "close": 1244.98,
  "high": 1246.11,
  "low": 1244.88,
  "volume": 78.709,
  "closed": true,
  "timestamp": 1670833862.004,
  "receipt_timestamp": 1670833862.3363352
}
This was obtained from kafkacat.
@Jaromir Hamala yeah, it's a json.
j

Jaromir Hamala

12/12/2022, 8:33 AM
can you send me a message just before this? is it possible there is a message where
volume
is NOT a floating point number, but an integer?
f

Furkan Küçük

12/12/2022, 8:35 AM
@Jaromir Hamala Wait a minute, I can send you an abundance of those image 🙂
@Jaromir Hamala there you go: https://textbin.net/afnqs3o6nw
j

Jaromir Hamala

12/12/2022, 8:46 AM
ok, I see what is going on. some message have a volume as integer. like this one:
{"exchange":"BINANCE","symbol":"TRX-BTC","start":1670832960,"stop":1670833019.999,"interval":"1m","trades":3,"open":0.00000308,"close":0.00000309,"high":0.00000309,"low":0.00000308,"volume":2769,"closed":true,"timestamp":1670833020.003,"receipt_timestamp":1670833020.1428316}
"volume":2769
the messages have no schema. so when the message is processed then it’s type is inferred as
long
and this is sent from Kafka to QuestDB. When the target table does not exist then it’s created an the
volume
column is created as
long
. but then the connector picks-up a message like this:
{
  "exchange": "BINANCE",
  "symbol": "EOS-USDT",
  "start": 1670832960,
  "stop": 1670833019.999,
  "interval": "1m",
  "trades": 2,
  "open": 0.974,
  "close": 0.974,
  "high": 0.974,
  "low": 0.974,
  "volume": 462.1,
  "closed": true,
  "timestamp": 1670833020.003,
  "receipt_timestamp": 1670833020.1456804
}
here the volume is float. so the connector sends it as
float
. but the table is already created as
long
and you cannot insert a floating point number into a
long
column -> error. the best what you can do is to create the table upfront via SQL wuth the
volume
column as
double
or
float
does it make sense to you?
f

Furkan Küçük

12/12/2022, 9:04 AM
@Jaromir Hamala yeah. It does make sense. Other than that, if I were to create a schema, would it solve the problem?
If so, can you refer a link for using schemas with kafka connect?
j

Jaromir Hamala

12/12/2022, 9:10 AM
the simplest thing is to create the target table in QuestDB manually. via SQL.
alternatively, you can change a format of you messages to avoid type ambiguity. Avro works nicely and optionally it can use a schema registry too. but these are bigger architectural changes. just creating the target table upfront is probably good enough.
I will add new options to the connector so you will be able to specify columns which should always be send as floating types.
f

Furkan Küçük

12/12/2022, 9:13 AM
@Jaromir Hamala Thanks Jaromir, you were super helpful!
j

Jaromir Hamala

12/12/2022, 9:13 AM
happy to help!
@Furkan Küçük I’m adding a config option to specify a list of columns which should be sent as a floating point number even when then happen to be integers. this allows tables to be auto-created even when they contain mixed data. see this test: https://github.com/questdb/kafka-questdb-connector/blob/9f8232a7b60fc864b7d0368f48[…]est/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java
this is the same situation as in your case:
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
        connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}");
the first message is an integer number and the 2nd is a floating point. normally this would fail, but this line
props.put(QuestDBSinkConnectorConfig.DOUBLE_COLUMNS_CONFIG, "age");
forces the connector to always send
age
as a floating point.
f

Furkan Küçük

12/12/2022, 10:08 AM
@Jaromir Hamala Wow, this was fast. Thanks mate.
@Jaromir Hamala The possible downside is unnecessarily larger column size in terms of disk space I guess, is that right?
j

Jaromir Hamala

12/12/2022, 10:09 AM
it’s not released yet. tests are currently running. if all is green then I should be able to cut a new release within a few hours.
both
double
and
long
use 64 bits so they are equal in this respect. the difference is that
double
loses precision. basically you get a wider range and floating points and you give up some precision.
that’s not QuestDB-specific, that’s how the
double
type is specified in IEEE 754.