Furkan Küçük
12/09/2022, 9:53 PM{
"name": "questdb-sink",
"config": {
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"topics": "book,candles,ticker,trades",
"host": "crypto_questdb",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
With this, Kafka connect actually connects, gets only one message from Kafka and throws the exception below:
[2022-12-09 21:43:47,413] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: [104] send error (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [104] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,421] WARN WorkerSinkTask{id=questdb-sink-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2022-12-09 21:43:47,421] ERROR WorkerSinkTask{id=questdb-sink-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.flush(QuestDBSinkTask.java:284)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:646)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closeAllPartitions(WorkerSinkTask.java:641)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,422] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [104] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
[2022-12-09 21:43:47,423] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.cutlass.line.AbstractLineSender.close(AbstractLineSender.java:111)
at io.questdb.kafka.QuestDBSinkTask.stop(QuestDBSinkTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:167)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,426] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Revoke previously assigned partitions book-0, candles-0, ticker-0, trades-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-12-09 21:43:47,427] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Member connector-consumer-questdb-sink-0-40875dda-9812-476c-8617-52a20314b30f sending LeaveGroup request to coordinator SOME_IP:SOME_PORT (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Jaromir Hamala
12/10/2022, 4:19 PMFurkan Küçük
12/12/2022, 5:53 AM2022-12-12T05:51:48.681936Z I i.q.c.l.t.TableUpdateDetails closing table writer [tableName=candles]
2022-12-12T05:51:48.681945Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.681981Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.682002Z I i.q.c.l.t.TableUpdateDetails closing table parsers [tableName=candles]
2022-12-12T05:51:48.682095Z I i.q.c.p.WriterPool >> [table=`candles`, thread=17]
2022-12-12T05:51:48.682373Z I i.q.c.l.t.LineTcpMeasurementScheduler assigned candles to thread 0
2022-12-12T05:51:48.682410Z I i.q.c.l.t.TableUpdateDetails network IO thread using table [workerId=2, tableName=candles, nNetworkIoWorkers=1]
2022-12-12T05:51:48.683024Z E i.q.c.l.t.LineTcpConnectionContext [22] could not process line data [table=candles, msg=cast error for line protocol float [columnWriterIndex=4, columnType=LONG, name=volume], errno=0]
2022-12-12T05:51:48.683189Z E i.q.c.l.t.LineTcpConnectionContext [22] could not parse measurement, NONE at 241, line (may be mangled due to partial parsing): 'candles symbol="BNB-USDT",receipt_timestamp=1.6708236602694917E9,start=1670823600i,trades=54i,volume=60.932,high=281.1,stop=1.670823659999E9,low=281i,closed=t,exchange="BINANCE",interval="1m",close=281.1,open=281.1,timestamp=1.670823660003E9'
2022-12-12T05:51:48.683226Z I tcp-line-server scheduling disconnect [fd=22, reason=0]
2022-12-12T05:51:48.683344Z I tcp-line-server disconnected [ip=172.27.0.3, fd=22, src=queue]
2022-12-12T05:51:48.683558Z I i.q.c.l.t.LineTcpWriterJob assigned table to writer thread [tableName=candles, threadId=0]
2022-12-12T05:51:49.559702Z I i.q.c.l.t.TableUpdateDetails network IO thread released table [workerId=2, tableName=candles, nNetworkIoWorkers=0]
2022-12-12T05:51:49.559734Z I i.q.c.l.t.LineTcpMeasurementScheduler active table going idle [tableName=candles]
2022-12-12T05:51:49.566590Z I i.q.c.l.t.LineTcpMeasurementScheduler releasing writer, its been idle since 2022-12-12T05:51:48.682000Z[tableName=candles]
2022-12-12T05:51:49.566666Z I i.q.c.p.WriterPool << [table=`candles`, thread=18]
Jaromir Hamala
12/12/2022, 8:28 AMcandles
table via SQL? it looks like the volume
column was created as LONG, but in Kafka it’s a floating point number. (float or double)Furkan Küçük
12/12/2022, 8:28 AMJaromir Hamala
12/12/2022, 8:31 AMFurkan Küçük
12/12/2022, 8:32 AM{
"exchange": "BINANCE_FUTURES",
"symbol": "ETH-USDT-22Z30",
"start": 1670833800,
"stop": 1670833859.999,
"interval": "1m",
"trades": 70,
"open": 1245.94,
"close": 1244.98,
"high": 1246.11,
"low": 1244.88,
"volume": 78.709,
"closed": true,
"timestamp": 1670833862.004,
"receipt_timestamp": 1670833862.3363352
}
This was obtained from kafkacat.Jaromir Hamala
12/12/2022, 8:33 AMvolume
is NOT a floating point number, but an integer?Furkan Küçük
12/12/2022, 8:35 AMJaromir Hamala
12/12/2022, 8:46 AM{"exchange":"BINANCE","symbol":"TRX-BTC","start":1670832960,"stop":1670833019.999,"interval":"1m","trades":3,"open":0.00000308,"close":0.00000309,"high":0.00000309,"low":0.00000308,"volume":2769,"closed":true,"timestamp":1670833020.003,"receipt_timestamp":1670833020.1428316}
"volume":2769
the messages have no schema. so when the message is processed then it’s type is inferred as long
and this is sent from Kafka to QuestDB. When the target table does not exist then it’s created an the volume
column is created as long
.
but then the connector picks-up a message like this:
{
"exchange": "BINANCE",
"symbol": "EOS-USDT",
"start": 1670832960,
"stop": 1670833019.999,
"interval": "1m",
"trades": 2,
"open": 0.974,
"close": 0.974,
"high": 0.974,
"low": 0.974,
"volume": 462.1,
"closed": true,
"timestamp": 1670833020.003,
"receipt_timestamp": 1670833020.1456804
}
here the volume is float. so the connector sends it as float
. but the table is already created as long
and you cannot insert a floating point number into a long
column -> error.
the best what you can do is to create the table upfront via SQL wuth the volume
column as double
or float
Furkan Küçük
12/12/2022, 9:04 AMJaromir Hamala
12/12/2022, 9:10 AMFurkan Küçük
12/12/2022, 9:13 AMJaromir Hamala
12/12/2022, 9:13 AMconnect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}");
the first message is an integer number and the 2nd is a floating point.
normally this would fail, but this line props.put(QuestDBSinkConnectorConfig.DOUBLE_COLUMNS_CONFIG, "age");
forces the connector to always send age
as a floating point.Furkan Küçük
12/12/2022, 10:08 AMJaromir Hamala
12/12/2022, 10:09 AMdouble
and long
use 64 bits so they are equal in this respect.
the difference is that double
loses precision.
basically you get a wider range and floating points and you give up some precision.double
type is specified in IEEE 754.