Title
h

Hà Hữu

10/06/2022, 4:10 AM
Hi i'm getting
max txn-inflight limit reached [txn=27503, min=11119, size=16384]
while query how can i fix it? Thanks
a

Andrey Pechkurov

10/06/2022, 6:38 AM
Hello, What version are you using? We have fixed a similar bug in one of the recent patch releases, so you should try the latest release - 6.5.3
h

Hà Hữu

10/06/2022, 6:39 AM
Hi i've fixed it but due update to latest version but after few mins questdb can't connect...
a

Andrey Pechkurov

10/06/2022, 6:42 AM
How does it look like? Any errors in the server logs?
h

Hà Hữu

10/06/2022, 6:42 AM
yes when it happend i'll send log error to u
a

Andrey Pechkurov

10/06/2022, 6:45 AM
👍
h

Hà Hữu

10/06/2022, 6:47 AM
@Andrey Pechkurov
questdb_1                | 2022-10-06T06:46:13.498904Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1                |      at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
i got this in questdb
anyway to fix it now?
a

Andrey Pechkurov

10/06/2022, 7:20 AM
This is clearly a bug. Could you create a GH issue with this error, the query that you're running and your table schema? https://github.com/questdb/questdb/issues/new?assignees=&labels=bug&template=bug_report.yaml
The error is related with ORDER BY being used in the query. If possible, you could try to remove ORDER BY from the query or re-write the query, so that the ORDER BY is on a different level - this should help to avoid this error until we release the bugfix
h

Hà Hữu

10/06/2022, 7:25 AM
I can't remove order by because i need to use limit...
Another question is there anyway to make kafka will not sync duplicated to questdb? i got too many duplicated data...
a

Andrey Pechkurov

10/06/2022, 7:51 AM
because i need to use limit...
then it may be possible to re-write the query to work around the bug
as for Kafka, @Jaromir Hamala might know better
n

Nicolas Hourcard

10/06/2022, 8:19 AM
@Hà Hữu I think you could be using Flink for avoiding duplicates. Jaromir will confirm
j

Jaromir Hamala

10/06/2022, 8:20 AM
Hello @Hà Hữu are duplicates already present in Kafka topics or they are somehow created on the way between Kafka and QuestDB?
a

Andrey Pechkurov

10/06/2022, 10:14 AM
@Hà Hữu could you share your table schema and the query? I'm working on a similar bugfix and I might fix your issue while I'm on it
h

Hà Hữu

10/06/2022, 10:31 AM
My table
CREATE TABLE trades (
  id INT,
  market_id STRING,
  price DOUBLE,
  amount DOUBLE,
  total DOUBLE,
  maker_order_id INT,
  taker_order_id INT,
  maker_id INT,
  taker_id INT,
  side STRING,
  created_at TIMESTAMP,
  updated_at TIMESTAMP
)
TIMESTAMP(created_at)
PARTITION BY DAY;
i'll send query soon
no problem about kafka sorry it's my bad
j

Jaromir Hamala

10/06/2022, 10:57 AM
ok, glad to hear that. my ask what do you use to transfer data from Kafka to QuestDB? Do you use Kafka Connect infrastructure or something you wrote on your own?
h

Hà Hữu

10/06/2022, 10:58 AM
i'm using kafka connect
j

Jaromir Hamala

10/06/2022, 10:58 AM
JDBC Connector?
h

Hà Hữu

10/06/2022, 10:58 AM
yes JDBC Sink
Debezium to JDBC
Hi @Andrey Pechkurov i've tested many query but working good just my app starting up then questdb got that problem...
SELECT * FROM (
  SELECT
    FIRST(price) AS open,
    max(price) AS high,
    min(price) AS low,
    last(price) AS close,
    sum(amount) AS volume,
    created_at as timestamp
  FROM
          trades
  WHERE
    market_id = 'btcusdt' AND created_at > dateadd('m', 60, systimestamp())
  SAMPLE BY 60m
  FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
) ORDER BY timestamp DESC LIMIT 0, 1
this is my full sql query
ok maybe i got it
questdb_1                | 2022-10-06T11:18:41.962005Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1                |              SELECT * FROM (
questdb_1                |                      SELECT
questdb_1                |                              FIRST(price) AS open,
questdb_1                |                              max(price) AS high,
questdb_1                |                              min(price) AS low,
questdb_1                |                              last(price) AS close,
questdb_1                |                              sum(amount) AS volume,
questdb_1                |                              created_at as timestamp
questdb_1                |                      FROM
questdb_1                |                              trades
questdb_1                |                      WHERE
questdb_1                |                              market_id = 'ltcusdt' AND created_at > dateadd('m', -1440, systimestamp())
questdb_1                |                      SAMPLE BY 1440m
questdb_1                |                      FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1                |              ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1                | 2022-10-06T11:18:41.962020Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1                | 2022-10-06T11:18:41.962022Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_4]
questdb_1                | 2022-10-06T11:18:41.962099Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1                | 2022-10-06T11:18:41.962602Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1                |              SELECT * FROM (
questdb_1                |                      SELECT
questdb_1                |                              FIRST(price) AS open,
questdb_1                |                              max(price) AS high,
questdb_1                |                              min(price) AS low,
questdb_1                |                              last(price) AS close,
questdb_1                |                              sum(amount) AS volume,
questdb_1                |                              created_at as timestamp
questdb_1                |                      FROM
questdb_1                |                              trades
questdb_1                |                      WHERE
questdb_1                |                              market_id = 'ltcusdt' AND created_at > dateadd('m', -4320, systimestamp())
questdb_1                |                      SAMPLE BY 4320m
questdb_1                |                      FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1                |              ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1                | 2022-10-06T11:18:41.962827Z I i.q.g.SqlCompiler plan [q=`select-choose open, high, low, close, volume, timestamp from (select-group-by [FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp] FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp from (select-choose [price, amount, created_at timestamp] price, amount, created_at timestamp from (select [price, amount, created_at, market_id] from trades timestamp (created_at) where market_id = 'ltcusdt' and created_at > dateadd('m',-(4320),systimestamp()))) sample by 4320m fill(NULL,NULL,NULL,NULL,0) align to calendar with offset '00:00') order by timestamp desc limit -(1),0`, fd=8]
questdb_1                | 2022-10-06T11:18:41.963100Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_5]
questdb_1                | 2022-10-06T11:18:41.963159Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1                |      at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
questdb_1                | 2022-10-06T11:18:41.963210Z I pg-server scheduling disconnect [fd=8, reason=17]
questdb_1                | 2022-10-06T11:18:41.963235Z I pg-server disconnected [ip=172.29.0.31, fd=8, src=queue]
questdb_1                | 2022-10-06T11:18:41.963323Z C server-main unhandled error [job=io.questdb.network.IODispatcherLinux@3ce3db41, ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clear(PGConnectionContext.java:301)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:54)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:31)
questdb_1                |      at io.questdb.std.WeakObjectPoolBase.push(WeakObjectPoolBase.java:63)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.push(WeakMutableObjectPool.java:42)
questdb_1                |      at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:62)
questdb_1                |      at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:35)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.doDisconnect(AbstractIODispatcher.java:289)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.disconnectContext(AbstractIODispatcher.java:274)
questdb_1                |      at io.questdb.mp.SCSequence.consumeAll(SCSequence.java:73)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processDisconnects(AbstractIODispatcher.java:309)
questdb_1                |      at io.questdb.network.IODispatcherLinux.runSerially(IODispatcherLinux.java:119)
questdb_1                |      at io.questdb.mp.SynchronizedJob.run(SynchronizedJob.java:39)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
questdb_1                | 2022-10-06T11:19:05.834692Z I i.q.c.h.p.StaticContentProcessor [372] incoming [url=/assets/vs/base/worker/workerMain.js]
a

Andrey Pechkurov

10/06/2022, 11:33 AM
Thanks!
h

Hà Hữu

10/06/2022, 11:36 AM
may i need create issuse in github?
a

Andrey Pechkurov

10/06/2022, 11:37 AM
not really
dateadd('m', 60, systimestamp())
should be
dateadd('m', -60, systimestamp())
unless you have data from the future
h

Hà Hữu

10/06/2022, 11:38 AM
hmmaybe i rwrite wrong but in my source code is neg number
a

Andrey Pechkurov

10/06/2022, 11:39 AM
dateadd('m', -60, systimestamp())
gives you the past timestamp of -60 minutes from the current time
while
dateadd('m', 60, systimestamp())
is a future timestamp
h

Hà Hữu

10/06/2022, 11:43 AM
yes number is past
i think it is error
a

Andrey Pechkurov

10/06/2022, 11:50 AM
Your fix will be (hopefully) included into this PR: https://github.com/questdb/questdb/pull/2598
Once it passes code review, I can share a snapshot QuestDB version, so that you can use it until we release the next patch version.
h

Hà Hữu

10/06/2022, 11:59 AM
yes thanks i’ll check it every hours 😒miling_face_with_tear:
@Andrey Pechkurov do you plan to add support limit & order for SAMPLE BY query?
a

Andrey Pechkurov

10/06/2022, 12:48 PM
It's currently support via a sub-query, just like in the query you're running. If you'd like to see it at the same level as SAMPLE BY, please submit a feature request: https://github.com/questdb/questdb/issues/new?assignees=&labels=New+feature&template=feature_request.yml
added it
a

Andrey Pechkurov

10/06/2022, 12:50 PM
My bad, didn't check the existing issues :(
No nearest future plans AFAIK - we're pretty busy with replication, but we could add support in future
j

Jaromir Hamala

10/06/2022, 12:51 PM
it’s brewing, hopefully the next release
it’s a silly bug in SQL parser
h

Hà Hữu

10/06/2022, 12:52 PM
i have one more request for example i have a table named
trades
with column price i can build kline from it using
SELECT FIRST(price) as open, MIN(price) as low, MAX(price) as high, LAST(price) as close FROM trades SAMPLE BY 15m FILL(PREV, PREV, PREV, PREV);
but if in
1 hour I don't have any trades
some row will filled by PREV it's good but in kline PREV of open, close, high, low must be FILL BY PREV of last if data is empty
i have talk with @Pei
a

Andrey Pechkurov

10/06/2022, 3:40 PM
Hopefully, it resolves the bug for you
p

Pei

10/07/2022, 1:23 PM
Hi @Hà Hữu did the snapshot resolve your issue?
h

Hà Hữu

10/09/2022, 1:38 AM
Hi i'll try in today can i get the branch ? i'll make a docker image
a

Andrey Pechkurov

10/09/2022, 6:13 AM
can i get the branch ?
Not sure if I got the question. Could you elaborate?
h

Hà Hữu

10/09/2022, 7:20 AM
Ahh no problem sorry
i've try with questdb web console it's still work with same sql command
ahh problem by tasks.max value 1 is no more problem
j

Jaromir Hamala

10/09/2022, 7:56 AM
yeah, that’s right. there cannot be parallel SQL inserts
would you be willing to try the ILP-based Kafka Connector? I can help you to set it up
with ILP you can have more than 1 kafka connect task
h

Hà Hữu

10/09/2022, 8:06 AM
yes i'm need it
can u share it for me yesterday i've checked data very slowly...
j

Jaromir Hamala

10/09/2022, 8:07 AM
do you have the kafka project checkout? if not then please do
git clone <https://github.com/questdb/kafka-questdb-connector.git>
h

Hà Hữu

10/09/2022, 8:12 AM
do you have jar ? i'm not a java developer...
j

Jaromir Hamala

10/09/2022, 8:13 AM
sure, give me a second
I understood you are now using the Kafka Connect JDBC sink. Do you know if you use Kafka Connect Distributed or Standalone mode?
h

Hà Hữu

10/09/2022, 8:15 AM
thanks i'll try it now
I'm using confluent kafka connect & redpanda
j

Jaromir Hamala

10/09/2022, 8:17 AM
can you paste here your JDBC sink configuration? or perhaps send it to my privately. obviously with all secrets edited out.
I can help to turn it into QuestDB Sink configuration
h

Hà Hữu

10/09/2022, 8:17 AM
{
  "auto.create": "false",
  "auto.evolve": "false",
  "connection.password": "quest",
  "connection.url": "jdbc:<postgresql://questdb:8812/qdb?useSSL=false>",
  "connection.user": "admin",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "dialect.name": "PostgreSqlDatabaseDialect",
  "insert.mode": "insert",
  "key.converter.schemas.enable": "true",
  "name": "questdb_jdbc_sink",
  "pk.mode": "none",
  "tasks.max": "20",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "string",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "string",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true"
}
this is my config
j

Jaromir Hamala

10/09/2022, 8:21 AM
please try this:
{
  "host": "quest",
  "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
  "key.converter.schemas.enable": "true",
  "name": "questdb_ilp_sink",
  "tasks.max": "1",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "string",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "string",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true"
}
h

Hà Hữu

10/09/2022, 8:22 AM
yes thanks i'll try now
j

Jaromir Hamala

10/09/2022, 8:22 AM
are you running Kafka Connect inside a Docker container? Do you know what Java version is used for Kafka Connect?
h

Hà Hữu

10/09/2022, 8:24 AM
not sure about this... the image built by confluentinc
j

Jaromir Hamala

10/09/2022, 8:24 AM
right. what’s the name of the image?
h

Hà Hữu

10/09/2022, 8:24 AM
confluentinc/cp-kafka-connect:7.2.2
sink is good now i'll test sync
j

Jaromir Hamala

10/09/2022, 8:28 AM
ok, let me know. I will add Debezium source into our testing pipeline and will write a guide
h

Hà Hữu

10/09/2022, 8:29 AM
good
🎉 thanks u so much
j

Jaromir Hamala

10/09/2022, 8:29 AM
is the record in QuestDB?
because this log means Debezium source read a change in your Postgres(?) database and send it to Kafka. now the QuestDB ILP Sink has to send the record to QuestDB
h

Hà Hữu

10/09/2022, 8:31 AM
yes i'm try again
yea not found in questdb
😐
but i dont get any error
j

Jaromir Hamala

10/09/2022, 8:33 AM
any error in the log? can you try
curl <kafka_connect_host>:8083/connectors/
h

Hà Hữu

10/09/2022, 8:34 AM
yes i've saw the logs restart and try create new record no log
j

Jaromir Hamala

10/09/2022, 8:35 AM
what’s the output of
curl <kafka_connect_host>:8083/connectors
?
h

Hà Hữu

10/09/2022, 8:35 AM
wait me a sec
j

Jaromir Hamala

10/09/2022, 8:37 AM
ok, now please do
curl localhost:8083/connectors/questdb_ilp_sink/status
ok, this looks good. no error indeed. so the question is why it’s not pushing anything to questdb
can you execute
show tables;
in QuestDB SQL Console?
h

Hà Hữu

10/09/2022, 8:39 AM
😒miling_face_with_tear: really can't understand
j

Jaromir Hamala

10/09/2022, 8:41 AM
are you sure there are no new records?
h

Hà Hữu

10/09/2022, 8:41 AM
yep i've try with query id
j

Jaromir Hamala

10/09/2022, 8:41 AM
can you please share questdb log?
h

Hà Hữu

10/09/2022, 8:41 AM
sure
j

Jaromir Hamala

10/09/2022, 8:42 AM
this is ILP - by default there is a commit lag. it means records will be visible only after some time. the lag is configurable.
so maybe it’s not visible yet. but the questdb log should tell us more.
h

Hà Hữu

10/09/2022, 8:46 AM
Hi
j

Jaromir Hamala

10/09/2022, 8:46 AM
anything in kafka connect log?
h

Hà Hữu

10/09/2022, 8:46 AM
let me check again
i'm not sure but i'll send u everything i've
after i create a record state of group is cchanged
Consumer group exists, but does not have any members
got the error now
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [32] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:52)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
j

Jaromir Hamala

10/09/2022, 8:52 AM
ok, that’s a progress! 🙂 anything in questdb logs?
h

Hà Hữu

10/09/2022, 8:52 AM
2022-10-09T08:47:08.614492Z E i.q.c.l.t.LineTcpConnectionContext [7] could not parse measurement, NONE at 374, line (may be mangled due to partial parsing): 'activities key_id=10484252i,id=10484252i,user_id=5i,category="user",user_ip="58.186.164.249",user_ip_country="VN",user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",topic="session",action="login",result="succeed",device="fake",created_at="2022-08-13 15:53:54.095",updated_at="2022-08-13 15:53:54.095"'
i think it's eror
👀
j

Jaromir Hamala

10/09/2022, 8:53 AM
anything above that?
h

Hà Hữu

10/09/2022, 8:53 AM
just that
2022-10-09T08:47:08.614483Z E i.q.c.l.t.LineTcpConnectionContext [7] could not process line data [table=activities, msg=cast error for line protocol string [columnWriterIndex=13, columnType=TIMESTAMP], errno=0]
questdb_1               | 2022-10-09T08:47:08.614492Z E i.q.c.l.t.LineTcpConnectionContext [7] could not parse measurement, NONE at 374, line (may be mangled due to partial parsing): 'activities key_id=10484252i,id=10484252i,user_id=5i,category="user",user_ip="58.186.164.249",user_ip_country="VN",user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",topic="session",action="login",result="succeed",device="fake",created_at="2022-08-13 15:53:54.095",updated_at="2022-08-13 15:53:54.095"'
questdb_1               | 2022-10-09T08:47:08.614504Z I tcp-line-server scheduling disconnect [fd=7, reason=0]
j

Jaromir Hamala

10/09/2022, 8:55 AM
what’s the schema of your table?
I think we are getting close. I thnk the schema expected by the ILP connector is differents from your actual schema
h

Hà Hữu

10/09/2022, 8:57 AM
hmm i dont see anything differents :|
j

Jaromir Hamala

10/09/2022, 8:57 AM
ok, yes, that’s it. the connectors sends columns
created_at
&
updated_at
as strings, even they should be timestamp. that should be solvable via transforms.
h

Hà Hữu

10/09/2022, 8:58 AM
i need to make it to nano sec?
got it
changed timestamp to Timestamp type
j

Jaromir Hamala

10/09/2022, 9:03 AM
there will be multiple steps: 1. you need to delete the existing connector. you can do it via
curl -X DELETE localhost:8083/connectors/questdb_ilp_sink
2. then submit it with a different configuration:
{
  "host": "quest",
  "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
  "key.converter.schemas.enable": "true",
  "name": "questdb_ilp_sink",
  "tasks.max": "1",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "unix",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "Timestamp",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true",
  "timestamp.field.name": "created_at"
}
h

Hà Hữu

10/09/2022, 9:03 AM
but the timestamp is 1970 😒miling_face_with_tear:
j

Jaromir Hamala

10/09/2022, 9:06 AM
we are almost there
If you include
"timestamp.field.name": "created_at"
in connector configuration then the date should be OK
h

Hà Hữu

10/09/2022, 9:07 AM
yea now work with your config
many thanks
j

Jaromir Hamala

10/09/2022, 9:07 AM
woohooo!
h

Hà Hữu

10/09/2022, 9:07 AM
🙏
i'll try with 20 task
j

Jaromir Hamala

10/09/2022, 9:07 AM
👍
bear in mind ILP is much faster than JDBC. so perhaps just 1 task is enough
20 definitely sounds too much
but feel free to experiement
h

Hà Hữu

10/09/2022, 9:12 AM
@Jaromir Hamala i've a question
j

Jaromir Hamala

10/09/2022, 9:12 AM
go ahead
h

Hà Hữu

10/09/2022, 9:13 AM
questdb will create two row 1 created 1 update
😐
how i can slove that?
j

Jaromir Hamala

10/09/2022, 9:15 AM
ILP is append-only. you will have the complete history of all events. you can LATEST-ON to get the latest row
QuestDB is heavily optimized for appending news rows. The ILP protocol reflect that. SQL can do UPDATEs too, but that’s meant to be used exceptionally. for example for fixing bad data. As it’s doing quite a lot of work and it’s not fast.
h

Hà Hữu

10/09/2022, 9:18 AM
sometime i dont update anything but it still record 😐
j

Jaromir Hamala

10/09/2022, 9:18 AM
what do you mean? can you elaborate it?
h

Hà Hữu

10/09/2022, 9:19 AM
i mean debezium still record a event (i dont update anything)
sorry my bad 😄
j

Jaromir Hamala

10/09/2022, 9:19 AM
Debezium is reading logs of your database. It should not create record out of thin air. hopefully 🙂
may I ask what do you use as a source for Debezium? is it Postgres? Or some other DB?
h

Hà Hữu

10/09/2022, 9:20 AM
yyess it's Postgres
does debezium have config for insert event only?
j

Jaromir Hamala

10/09/2022, 9:21 AM
not sure, let me check.
h

Hà Hữu

10/09/2022, 9:26 AM
maybe not...
😒miling_face_with_tear:
j

Jaromir Hamala

10/09/2022, 9:27 AM
If there is no configuraiton in Debezium it should be possible to filter out UPDATEs via Kafka Connect Transforms
can you paste here your Debezium Connector configuration?
h

Hà Hữu

10/09/2022, 9:36 AM
{
  "column.exclude.list": "public.users.password_digest,public.users.data",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.dbname": "barong_production",
  "database.hostname": "db",
  "database.password": "zsmartex",
  "database.server.name": "pg",
  "database.user": "zsmartex",
  "decimal.handling.mode": "double",
  "key.converter.schemas.enable": "true",
  "name": "pg-barong-connector",
  "slot.name": "barong_connector",
  "snapshot.mode": "never",
  "table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users",
  "time.precision.mode": "connect",
  "transforms": "dropPrefix",
  "transforms.dropPrefix.regex": "pg.public.(.*)",
  "transforms.dropPrefix.replacement": "pg.$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "value.converter.schemas.enable": "true"
}
the unwarp transform always make it create one more :face_with_spiral_eyes:
j

Jaromir Hamala

10/09/2022, 9:40 AM
we can filter before the unwrap transform
we could actually filter on the source side. so only INSERT events will make it to Kafka
this might be your best shot: https://debezium.io/documentation/reference/stable/transformations/filtering.html apply on the the debezium side. each message emitted by debezium has a field
op
c = create
    u = update
    d = delete
    r = read (applies to only snapshots)
    t = truncate
    m = message
so you would use filter to get only message where
op == 'c'
yeah, something along these lines
h

Hà Hữu

10/09/2022, 10:14 AM
i'll try it very soon
anyway thanks for your help 🙌
j

Jaromir Hamala

10/09/2022, 10:55 AM
no problem at all! I will explore it in parallel, I’m already working on adding a Debezim scenario into our testing pipeline
Hello @Hà Hữu, good news: Debezium source has a configuration property
skipped.operations
see https://debezium.io/documentation/reference/stable/connectors/postgresql.html and search for
skipped.operations
it should be enough to add this property to
{
  "column.exclude.list": "public.users.password_digest,public.users.data",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.dbname": "barong_production",
  "database.hostname": "db",
  "database.password": "zsmartex",
  "database.server.name": "pg",
  "database.user": "zsmartex",
  "decimal.handling.mode": "double",
  "key.converter.schemas.enable": "true",
  "name": "pg-barong-connector",
  "slot.name": "barong_connector",
  "snapshot.mode": "never",
  "table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users",
  "time.precision.mode": "connect",
  "transforms": "dropPrefix",
  "transforms.dropPrefix.regex": "pg.public.(.*)",
  "transforms.dropPrefix.replacement": "pg.$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "value.converter.schemas.enable": "true"
}
something like
"skipped.operations": "u,d,t"
h

Hà Hữu

10/09/2022, 4:48 PM
Hi @Jaromir Hamala
i'm try to sync debezium to ILP questdb but i got an error about bigInterger
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NoClassDefFoundError: sun/misc/FDBigInteger
	at io.questdb.std.Numbers.append(Numbers.java:2214)
	at io.questdb.std.Numbers.append(Numbers.java:255)
	at io.questdb.std.Numbers.append(Numbers.java:211)
	at io.questdb.std.str.CharSink.put(CharSink.java:109)
	at io.questdb.cutlass.line.AbstractLineSender.field(AbstractLineSender.java:145)
	at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:151)
	at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:46)
	at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:180)
	at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
	at io.questdb.kafka.QuestDBSinkTask.handleStruct(QuestDBSinkTask.java:87)
	at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:196)
	at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
	at io.questdb.kafka.QuestDBSinkTask.handleSingleRecord(QuestDBSinkTask.java:63)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:50)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
j

Jaromir Hamala

10/09/2022, 4:50 PM
I see. that’s a known issue due Java version compatibility. there is a workaround. earlier today I sent you a zip with two JAR files: 1. kafka-questdb-connector-0.1-SNAPSHOT.jar 2. questdb-6.5.2-jdk8.jar you uploaded these 2 JAR files into Kafka Connect. is that right?
h

Hà Hữu

10/09/2022, 4:51 PM
yes
that right
j

Jaromir Hamala

10/09/2022, 4:52 PM
so delete questdb-6.5.2-jdk8.jar from your Kafka Connect and replace it with this: https://repo1.maven.org/maven2/org/questdb/questdb/6.5.3/questdb-6.5.3.jar
then it should work
h

Hà Hữu

10/09/2022, 4:53 PM
i'll try it now
hmm i got another error
questdb_1               | 2022-10-09T16:56:07.279604Z E i.q.c.l.t.LineTcpConnectionContext [163] could not process line data [table=operations_liabilities, msg=cast error for line protocol integer [columnWriterIndex=3, columnType=STRING], errno=0]
questdb_1               | 2022-10-09T16:56:07.279613Z E i.q.c.l.t.LineTcpConnectionContext [163] could not parse measurement, NONE at 196, line (may be mangled due to partial parsing): 'operations_liabilities key_id=11i,id=11i,code=212i,member_id=1i,currency_id="bnb",debit=0.015,credit=0.0,reference_type="Withdraw",reference_id=12i,updated_at=1660746601670000t 1660746601670000000'
j

Jaromir Hamala

10/09/2022, 4:59 PM
is there anything before this?
what is the schema of this table?
h

Hà Hữu

10/09/2022, 5:00 PM
no nothing
just this error
j

Jaromir Hamala

10/09/2022, 5:01 PM
you have
memberId
as string, but it looks it’s Integer in Kafka/Postgres?
h

Hà Hữu

10/09/2022, 5:01 PM
OMG
it's my bad
:face_with_spiral_eyes:
j

Jaromir Hamala

10/09/2022, 5:01 PM
haha, that I’m glad it’s not a problem with the connect 🙂
h

Hà Hữu

10/09/2022, 5:02 PM
ahh
why ILP insert a key_id?
😐
j

Jaromir Hamala

10/09/2022, 5:03 PM
it’s key from kafka message.
would you like to filter it out? it should be possible via Kafka Connect Transform. setting the key to
null
should do it. but perhaps it’s a common use-case so I can add explicit support for this to the Quest ILP connector
h

Hà Hữu

10/09/2022, 5:08 PM
yes i think u should add it to connector but for now it's not a big problem for me
ILP very fast OMG :face_with_spiral_eyes:
j

Jaromir Hamala

10/09/2022, 5:10 PM
very cool! the connector will get even faster over time. I have done zero performance optimizations so far, I know some things can be improved. I’ll do it after the basic functionality is there.
here is a new version of the connect. it supports a new config option:
include.key
when you set it to
false
then message keys won’t be include in questdb tables. see this for all options: https://github.com/questdb/kafka-questdb-connector#configuration
h

Hà Hữu

10/09/2022, 5:37 PM
i'll try it in tomorrow
but i'm getting problem
i know ilp very fast but why the consume group have some lag messagee?
is there any batch.size config?
j

Jaromir Hamala

10/09/2022, 5:39 PM
Is the lag decreasing over time?
h

Hà Hữu

10/09/2022, 5:39 PM
yes
j

Jaromir Hamala

10/09/2022, 5:40 PM
Maybe there are out of order inserts? Is questdb busy? Anything about O3 in questdb log?
Do you know if messages are kafka are out of order? You might want to try just 1 task. High level of parallelism in kafka might lead to our of order in questdb
h

Hà Hữu

10/09/2022, 5:46 PM
ok let me try :face_with_spiral_eyes:
j

Jaromir Hamala

10/09/2022, 5:51 PM
Maybe one task per table/topic would be best
h

Hà Hữu

10/09/2022, 5:54 PM
hmm i'll try with one task on topic one table for now i'm using one task but it still lag for my trading bot
😒miling_face_with_tear: elasticsearch too
hmm maybe the hardware problem elasticsearch one topic one index with 3 tasks still like that
:face_with_spiral_eyes: anyway thanks you for this day
j

Jaromir Hamala

10/09/2022, 6:22 PM
You are very welcome!
It helped me to understand requirements for ILP Connect sink better. So thank you for that.