Title
#users-public
Hà Hữu

Hà Hữu

10/06/2022, 4:10 AM
Hi i'm getting
max txn-inflight limit reached [txn=27503, min=11119, size=16384]
while query how can i fix it? Thanks
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 6:38 AM
Hello, What version are you using? We have fixed a similar bug in one of the recent patch releases, so you should try the latest release - 6.5.3
Hà Hữu

Hà Hữu

10/06/2022, 6:39 AM
Hi i've fixed it but due update to latest version but after few mins questdb can't connect...
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 6:42 AM
How does it look like? Any errors in the server logs?
Hà Hữu

Hà Hữu

10/06/2022, 6:42 AM
yes when it happend i'll send log error to u
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 6:45 AM
👍
Hà Hữu

Hà Hữu

10/06/2022, 6:47 AM
@Andrey Pechkurov
questdb_1                | 2022-10-06T06:46:13.498904Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1                |      at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
6:48 AM
i got this in questdb
6:59 AM
anyway to fix it now?
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 7:20 AM
This is clearly a bug. Could you create a GH issue with this error, the query that you're running and your table schema?https://github.com/questdb/questdb/issues/new?assignees=&labels=bug&template=bug_report.yaml
7:21 AM
The error is related with ORDER BY being used in the query. If possible, you could try to remove ORDER BY from the query or re-write the query, so that the ORDER BY is on a different level - this should help to avoid this error until we release the bugfix
Hà Hữu

Hà Hữu

10/06/2022, 7:25 AM
I can't remove order by because i need to use limit...
7:47 AM
Another question is there anyway to make kafka will not sync duplicated to questdb? i got too many duplicated data...
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 7:51 AM
because i need to use limit...
then it may be possible to re-write the query to work around the bug
7:51 AM
as for Kafka, @Jaromir Hamala might know better
Nicolas Hourcard

Nicolas Hourcard

10/06/2022, 8:19 AM
@Hà Hữu I think you could be using Flink for avoiding duplicates. Jaromir will confirm
Jaromir Hamala

Jaromir Hamala

10/06/2022, 8:20 AM
Hello @Hà Hữu are duplicates already present in Kafka topics or they are somehow created on the way between Kafka and QuestDB?
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 10:14 AM
@Hà Hữu could you share your table schema and the query? I'm working on a similar bugfix and I might fix your issue while I'm on it
Hà Hữu

Hà Hữu

10/06/2022, 10:31 AM
My table
CREATE TABLE trades (
  id INT,
  market_id STRING,
  price DOUBLE,
  amount DOUBLE,
  total DOUBLE,
  maker_order_id INT,
  taker_order_id INT,
  maker_id INT,
  taker_id INT,
  side STRING,
  created_at TIMESTAMP,
  updated_at TIMESTAMP
)
TIMESTAMP(created_at)
PARTITION BY DAY;
10:31 AM
i'll send query soon
10:31 AM
no problem about kafka sorry it's my bad
Jaromir Hamala

Jaromir Hamala

10/06/2022, 10:57 AM
ok, glad to hear that. my ask what do you use to transfer data from Kafka to QuestDB? Do you use Kafka Connect infrastructure or something you wrote on your own?
Hà Hữu

Hà Hữu

10/06/2022, 10:58 AM
i'm using kafka connect
Jaromir Hamala

Jaromir Hamala

10/06/2022, 10:58 AM
JDBC Connector?
Hà Hữu

Hà Hữu

10/06/2022, 10:58 AM
yes JDBC Sink
10:58 AM
Debezium to JDBC
11:14 AM
Hi @Andrey Pechkurov i've tested many query but working good just my app starting up then questdb got that problem...
11:14 AM
SELECT * FROM (
  SELECT
    FIRST(price) AS open,
    max(price) AS high,
    min(price) AS low,
    last(price) AS close,
    sum(amount) AS volume,
    created_at as timestamp
  FROM
          trades
  WHERE
    market_id = 'btcusdt' AND created_at > dateadd('m', 60, systimestamp())
  SAMPLE BY 60m
  FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
) ORDER BY timestamp DESC LIMIT 0, 1
11:14 AM
this is my full sql query
11:19 AM
ok maybe i got it
11:19 AM
questdb_1                | 2022-10-06T11:18:41.962005Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1                |              SELECT * FROM (
questdb_1                |                      SELECT
questdb_1                |                              FIRST(price) AS open,
questdb_1                |                              max(price) AS high,
questdb_1                |                              min(price) AS low,
questdb_1                |                              last(price) AS close,
questdb_1                |                              sum(amount) AS volume,
questdb_1                |                              created_at as timestamp
questdb_1                |                      FROM
questdb_1                |                              trades
questdb_1                |                      WHERE
questdb_1                |                              market_id = 'ltcusdt' AND created_at > dateadd('m', -1440, systimestamp())
questdb_1                |                      SAMPLE BY 1440m
questdb_1                |                      FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1                |              ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1                | 2022-10-06T11:18:41.962020Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1                | 2022-10-06T11:18:41.962022Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_4]
questdb_1                | 2022-10-06T11:18:41.962099Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1                | 2022-10-06T11:18:41.962602Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1                |              SELECT * FROM (
questdb_1                |                      SELECT
questdb_1                |                              FIRST(price) AS open,
questdb_1                |                              max(price) AS high,
questdb_1                |                              min(price) AS low,
questdb_1                |                              last(price) AS close,
questdb_1                |                              sum(amount) AS volume,
questdb_1                |                              created_at as timestamp
questdb_1                |                      FROM
questdb_1                |                              trades
questdb_1                |                      WHERE
questdb_1                |                              market_id = 'ltcusdt' AND created_at > dateadd('m', -4320, systimestamp())
questdb_1                |                      SAMPLE BY 4320m
questdb_1                |                      FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1                |              ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1                | 2022-10-06T11:18:41.962827Z I i.q.g.SqlCompiler plan [q=`select-choose open, high, low, close, volume, timestamp from (select-group-by [FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp] FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp from (select-choose [price, amount, created_at timestamp] price, amount, created_at timestamp from (select [price, amount, created_at, market_id] from trades timestamp (created_at) where market_id = 'ltcusdt' and created_at > dateadd('m',-(4320),systimestamp()))) sample by 4320m fill(NULL,NULL,NULL,NULL,0) align to calendar with offset '00:00') order by timestamp desc limit -(1),0`, fd=8]
questdb_1                | 2022-10-06T11:18:41.963100Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_5]
questdb_1                | 2022-10-06T11:18:41.963159Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1                |      at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1                |      at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
questdb_1                | 2022-10-06T11:18:41.963210Z I pg-server scheduling disconnect [fd=8, reason=17]
questdb_1                | 2022-10-06T11:18:41.963235Z I pg-server disconnected [ip=172.29.0.31, fd=8, src=queue]
questdb_1                | 2022-10-06T11:18:41.963323Z C server-main unhandled error [job=io.questdb.network.IODispatcherLinux@3ce3db41, ex=
questdb_1                | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1                |      at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1                |      at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1                |      at io.questdb.std.Misc.free(Misc.java:46)
questdb_1                |      at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1                |      at io.questdb.cutlass.pgwire.PGConnectionContext.clear(PGConnectionContext.java:301)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:54)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:31)
questdb_1                |      at io.questdb.std.WeakObjectPoolBase.push(WeakObjectPoolBase.java:63)
questdb_1                |      at io.questdb.std.WeakMutableObjectPool.push(WeakMutableObjectPool.java:42)
questdb_1                |      at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:62)
questdb_1                |      at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:35)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.doDisconnect(AbstractIODispatcher.java:289)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.disconnectContext(AbstractIODispatcher.java:274)
questdb_1                |      at io.questdb.mp.SCSequence.consumeAll(SCSequence.java:73)
questdb_1                |      at io.questdb.network.AbstractIODispatcher.processDisconnects(AbstractIODispatcher.java:309)
questdb_1                |      at io.questdb.network.IODispatcherLinux.runSerially(IODispatcherLinux.java:119)
questdb_1                |      at io.questdb.mp.SynchronizedJob.run(SynchronizedJob.java:39)
questdb_1                |      at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1                | ]
questdb_1                | 2022-10-06T11:19:05.834692Z I i.q.c.h.p.StaticContentProcessor [372] incoming [url=/assets/vs/base/worker/workerMain.js]
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 11:33 AM
Thanks!
Hà Hữu

Hà Hữu

10/06/2022, 11:36 AM
may i need create issuse in github?
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 11:37 AM
not really
11:37 AM
dateadd('m', 60, systimestamp())
should be
dateadd('m', -60, systimestamp())
unless you have data from the future
Hà Hữu

Hà Hữu

10/06/2022, 11:38 AM
hmmaybe i rwrite wrong but in my source code is neg number
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 11:39 AM
dateadd('m', -60, systimestamp())
gives you the past timestamp of -60 minutes from the current time
11:39 AM
while
dateadd('m', 60, systimestamp())
is a future timestamp
Hà Hữu

Hà Hữu

10/06/2022, 11:43 AM
yes number is past
11:45 AM
i think it is error
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 11:50 AM
Your fix will be (hopefully) included into this PR:https://github.com/questdb/questdb/pull/2598
11:51 AM
Once it passes code review, I can share a snapshot QuestDB version, so that you can use it until we release the next patch version.
Hà Hữu

Hà Hữu

10/06/2022, 11:59 AM
yes thanks i’ll check it every hours 😒miling_face_with_tear:
12:46 PM
@Andrey Pechkurov do you plan to add support limit & order for SAMPLE BY query?
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 12:48 PM
It's currently support via a sub-query, just like in the query you're running. If you'd like to see it at the same level as SAMPLE BY, please submit a feature request: https://github.com/questdb/questdb/issues/new?assignees=&labels=New+feature&template=feature_request.yml
12:49 PM
added it
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 12:50 PM
My bad, didn't check the existing issues 😦
12:50 PM
No nearest future plans AFAIK - we're pretty busy with replication, but we could add support in future
Jaromir Hamala

Jaromir Hamala

10/06/2022, 12:51 PM
12:51 PM
it’s brewing, hopefully the next release
12:51 PM
it’s a silly bug in SQL parser
Hà Hữu

Hà Hữu

10/06/2022, 12:52 PM
i have one more request for example i have a table named
trades
with column price i can build kline from it using
SELECT FIRST(price) as open, MIN(price) as low, MAX(price) as high, LAST(price) as close FROM trades SAMPLE BY 15m FILL(PREV, PREV, PREV, PREV);
but if in
1 hour I don't have any trades
some row will filled by PREV it's good but in kline PREV of open, close, high, low must be FILL BY PREV of last if data is empty
12:52 PM
i have talk with @Pei
Andrey Pechkurov

Andrey Pechkurov

10/06/2022, 3:40 PM
Hopefully, it resolves the bug for you
Pei

Pei

10/07/2022, 1:23 PM
Hi @Hà Hữu did the snapshot resolve your issue?
Hà Hữu

Hà Hữu

10/09/2022, 1:38 AM
Hi i'll try in today can i get the branch ? i'll make a docker image
Andrey Pechkurov

Andrey Pechkurov

10/09/2022, 6:13 AM
can i get the branch ?
Not sure if I got the question. Could you elaborate?
Hà Hữu

Hà Hữu

10/09/2022, 7:20 AM
Ahh no problem sorry
7:46 AM
i've try with questdb web console it's still work with same sql command
7:50 AM
ahh problem by tasks.max value 1 is no more problem
Jaromir Hamala

Jaromir Hamala

10/09/2022, 7:56 AM
yeah, that’s right. there cannot be parallel SQL inserts
7:57 AM
would you be willing to try the ILP-based Kafka Connector? I can help you to set it up
7:57 AM
with ILP you can have more than 1 kafka connect task
Hà Hữu

Hà Hữu

10/09/2022, 8:06 AM
yes i'm need it
8:06 AM
can u share it for me yesterday i've checked data very slowly...
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:07 AM
do you have the kafka project checkout? if not then please do
git clone <https://github.com/questdb/kafka-questdb-connector.git>
Hà Hữu

Hà Hữu

10/09/2022, 8:12 AM
do you have jar ? i'm not a java developer...
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:13 AM
sure, give me a second
8:15 AM
I understood you are now using the Kafka Connect JDBC sink. Do you know if you use Kafka Connect Distributed or Standalone mode?
Hà Hữu

Hà Hữu

10/09/2022, 8:15 AM
thanks i'll try it now
8:16 AM
I'm using confluent kafka connect & redpanda
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:17 AM
can you paste here your JDBC sink configuration? or perhaps send it to my privately. obviously with all secrets edited out.
8:17 AM
I can help to turn it into QuestDB Sink configuration
Hà Hữu

Hà Hữu

10/09/2022, 8:17 AM
{
  "auto.create": "false",
  "auto.evolve": "false",
  "connection.password": "quest",
  "connection.url": "jdbc:<postgresql://questdb:8812/qdb?useSSL=false>",
  "connection.user": "admin",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "dialect.name": "PostgreSqlDatabaseDialect",
  "insert.mode": "insert",
  "key.converter.schemas.enable": "true",
  "name": "questdb_jdbc_sink",
  "pk.mode": "none",
  "tasks.max": "20",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "string",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "string",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true"
}
8:17 AM
this is my config
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:21 AM
please try this:
{
  "host": "quest",
  "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
  "key.converter.schemas.enable": "true",
  "name": "questdb_ilp_sink",
  "tasks.max": "1",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "string",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "string",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true"
}
Hà Hữu

Hà Hữu

10/09/2022, 8:22 AM
yes thanks i'll try now
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:22 AM
are you running Kafka Connect inside a Docker container? Do you know what Java version is used for Kafka Connect?
Hà Hữu

Hà Hữu

10/09/2022, 8:24 AM
not sure about this... the image built by confluentinc
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:24 AM
right. what’s the name of the image?
Hà Hữu

Hà Hữu

10/09/2022, 8:24 AM
confluentinc/cp-kafka-connect:7.2.2
8:27 AM
sink is good now i'll test sync
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:28 AM
ok, let me know. I will add Debezium source into our testing pipeline and will write a guide
Hà Hữu

Hà Hữu

10/09/2022, 8:29 AM
good
8:29 AM
🎉 thanks u so much
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:29 AM
is the record in QuestDB?
8:30 AM
because this log means Debezium source read a change in your Postgres(?) database and send it to Kafka. now the QuestDB ILP Sink has to send the record to QuestDB
Hà Hữu

Hà Hữu

10/09/2022, 8:31 AM
yes i'm try again
8:32 AM
yea not found in questdb
8:32 AM
😐
8:33 AM
but i dont get any error
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:33 AM
any error in the log? can you try
curl <kafka_connect_host>:8083/connectors/
Hà Hữu

Hà Hữu

10/09/2022, 8:34 AM
yes i've saw the logs restart and try create new record no log
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:35 AM
what’s the output of
curl <kafka_connect_host>:8083/connectors
?
Hà Hữu

Hà Hữu

10/09/2022, 8:35 AM
wait me a sec
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:37 AM
ok, now please do
curl localhost:8083/connectors/questdb_ilp_sink/status
8:38 AM
ok, this looks good. no error indeed. so the question is why it’s not pushing anything to questdb
8:39 AM
can you execute
show tables;
in QuestDB SQL Console?
Hà Hữu

Hà Hữu

10/09/2022, 8:39 AM
😒miling_face_with_tear: really can't understand
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:41 AM
are you sure there are no new records?
Hà Hữu

Hà Hữu

10/09/2022, 8:41 AM
yep i've try with query id
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:41 AM
can you please share questdb log?
Hà Hữu

Hà Hữu

10/09/2022, 8:41 AM
sure
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:42 AM
this is ILP - by default there is a commit lag. it means records will be visible only after some time. the lag is configurable.
8:42 AM
so maybe it’s not visible yet. but the questdb log should tell us more.
Hà Hữu

Hà Hữu

10/09/2022, 8:46 AM
Hi
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:46 AM
anything in kafka connect log?
Hà Hữu

Hà Hữu

10/09/2022, 8:46 AM
let me check again
8:48 AM
i'm not sure but i'll send u everything i've
8:51 AM
after i create a record state of group is cchanged
8:51 AM
Consumer group exists, but does not have any members
8:51 AM
got the error now
8:51 AM
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [32] send error
	at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
	at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
	at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:52)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:52 AM
ok, that’s a progress! 🙂 anything in questdb logs?
Hà Hữu

Hà Hữu

10/09/2022, 8:52 AM
2022-10-09T08:47:08.614492Z E i.q.c.l.t.LineTcpConnectionContext [7] could not parse measurement, NONE at 374, line (may be mangled due to partial parsing): 'activities key_id=10484252i,id=10484252i,user_id=5i,category="user",user_ip="58.186.164.249",user_ip_country="VN",user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",topic="session",action="login",result="succeed",device="fake",created_at="2022-08-13 15:53:54.095",updated_at="2022-08-13 15:53:54.095"'
8:52 AM
i think it's eror
8:53 AM
👀
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:53 AM
anything above that?
Hà Hữu

Hà Hữu

10/09/2022, 8:53 AM
just that
8:53 AM
2022-10-09T08:47:08.614483Z E i.q.c.l.t.LineTcpConnectionContext [7] could not process line data [table=activities, msg=cast error for line protocol string [columnWriterIndex=13, columnType=TIMESTAMP], errno=0]
questdb_1               | 2022-10-09T08:47:08.614492Z E i.q.c.l.t.LineTcpConnectionContext [7] could not parse measurement, NONE at 374, line (may be mangled due to partial parsing): 'activities key_id=10484252i,id=10484252i,user_id=5i,category="user",user_ip="58.186.164.249",user_ip_country="VN",user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",topic="session",action="login",result="succeed",device="fake",created_at="2022-08-13 15:53:54.095",updated_at="2022-08-13 15:53:54.095"'
questdb_1               | 2022-10-09T08:47:08.614504Z I tcp-line-server scheduling disconnect [fd=7, reason=0]
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:55 AM
what’s the schema of your table?
8:55 AM
I think we are getting close. I thnk the schema expected by the ILP connector is differents from your actual schema
Hà Hữu

Hà Hữu

10/09/2022, 8:57 AM
hmm i dont see anything differents 😐
Jaromir Hamala

Jaromir Hamala

10/09/2022, 8:57 AM
ok, yes, that’s it. the connectors sends columns
created_at
&
updated_at
as strings, even they should be timestamp. that should be solvable via transforms.
Hà Hữu

Hà Hữu

10/09/2022, 8:58 AM
i need to make it to nano sec?
9:03 AM
got it
9:03 AM
changed timestamp to Timestamp type
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:03 AM
there will be multiple steps:1. you need to delete the existing connector. you can do it via
curl -X DELETE localhost:8083/connectors/questdb_ilp_sink
2. then submit it with a different configuration:
{
  "host": "quest",
  "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
  "key.converter.schemas.enable": "true",
  "name": "questdb_ilp_sink",
  "tasks.max": "1",
  "topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
  "transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
  "transforms.addPrefix.regex": ".*",
  "transforms.addPrefix.replacement": "$0",
  "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.createdAtTimestampConverter.field": "created_at",
  "transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.createdAtTimestampConverter.target.type": "unix",
  "transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.dropPrefix.regex": "pg.(.*)",
  "transforms.dropPrefix.replacement": "$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.updatedAtTimestampConverter.field": "updated_at",
  "transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
  "transforms.updatedAtTimestampConverter.target.type": "Timestamp",
  "transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "value.converter.schemas.enable": "true",
  "timestamp.field.name": "created_at"
}
Hà Hữu

Hà Hữu

10/09/2022, 9:03 AM
but the timestamp is 1970 😒miling_face_with_tear:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:06 AM
we are almost there
9:06 AM
If you include
"timestamp.field.name": "created_at"
in connector configuration then the date should be OK
Hà Hữu

Hà Hữu

10/09/2022, 9:07 AM
yea now work with your config
9:07 AM
many thanks
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:07 AM
woohooo!
Hà Hữu

Hà Hữu

10/09/2022, 9:07 AM
🙏
9:07 AM
i'll try with 20 task
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:07 AM
👍
9:08 AM
bear in mind ILP is much faster than JDBC. so perhaps just 1 task is enough
9:08 AM
20 definitely sounds too much
9:08 AM
but feel free to experiement
Hà Hữu

Hà Hữu

10/09/2022, 9:12 AM
@Jaromir Hamala i've a question
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:12 AM
go ahead
Hà Hữu

Hà Hữu

10/09/2022, 9:13 AM
questdb will create two row 1 created 1 update
9:13 AM
😐
9:14 AM
how i can slove that?
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:15 AM
ILP is append-only. you will have the complete history of all events. you can LATEST-ON to get the latest row
9:18 AM
QuestDB is heavily optimized for appending news rows. The ILP protocol reflect that. SQL can do UPDATEs too, but that’s meant to be used exceptionally. for example for fixing bad data. As it’s doing quite a lot of work and it’s not fast.
Hà Hữu

Hà Hữu

10/09/2022, 9:18 AM
sometime i dont update anything but it still record 😐
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:18 AM
what do you mean? can you elaborate it?
Hà Hữu

Hà Hữu

10/09/2022, 9:19 AM
i mean debezium still record a event (i dont update anything)
9:19 AM
sorry my bad 😄
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:19 AM
Debezium is reading logs of your database. It should not create record out of thin air. hopefully 🙂
9:20 AM
may I ask what do you use as a source for Debezium? is it Postgres? Or some other DB?
Hà Hữu

Hà Hữu

10/09/2022, 9:20 AM
yyess it's Postgres
9:20 AM
does debezium have config for insert event only?
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:21 AM
not sure, let me check.
Hà Hữu

Hà Hữu

10/09/2022, 9:26 AM
maybe not...
9:26 AM
😒miling_face_with_tear:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:27 AM
If there is no configuraiton in Debezium it should be possible to filter out UPDATEs via Kafka Connect Transforms
9:35 AM
can you paste here your Debezium Connector configuration?
Hà Hữu

Hà Hữu

10/09/2022, 9:36 AM
{ "column.exclude.list": "public.users.password_digest,public.users.data", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.dbname": "barong_production", "database.hostname": "db", "database.password": "zsmartex", "database.server.name": "pg", "database.user": "zsmartex", "decimal.handling.mode": "double", "key.converter.schemas.enable": "true", "name": "pg-barong-connector", "slot.name": "barong_connector", "snapshot.mode": "never", "table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users", "time.precision.mode": "connect", "transforms": "dropPrefix", "transforms.dropPrefix.regex": "pg.public.(.*)", "transforms.dropPrefix.replacement": "pg.$1", "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "value.converter.schemas.enable": "true" }
9:38 AM
the unwarp transform always make it create one more :face_with_spiral_eyes:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 9:40 AM
we can filter before the unwrap transform
9:41 AM
we could actually filter on the source side. so only INSERT events will make it to Kafka
9:59 AM
this might be your best shot: https://debezium.io/documentation/reference/stable/transformations/filtering.html apply on the the debezium side. each message emitted by debezium has a field
op
c = create
    u = update
    d = delete
    r = read (applies to only snapshots)
    t = truncate
    m = message
so you would use filter to get only message where
op == 'c'
10:12 AM
yeah, something along these lines
Hà Hữu

Hà Hữu

10/09/2022, 10:14 AM
i'll try it very soon
10:14 AM
anyway thanks for your help 🙌
Jaromir Hamala

Jaromir Hamala

10/09/2022, 10:55 AM
no problem at all! I will explore it in parallel, I’m already working on adding a Debezim scenario into our testing pipeline
4:14 PM
Hello @Hà Hữu, good news: Debezium source has a configuration property
skipped.operations
see https://debezium.io/documentation/reference/stable/connectors/postgresql.html and search for
skipped.operations
4:15 PM
it should be enough to add this property to
{
  "column.exclude.list": "public.users.password_digest,public.users.data",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.dbname": "barong_production",
  "database.hostname": "db",
  "database.password": "zsmartex",
  "database.server.name": "pg",
  "database.user": "zsmartex",
  "decimal.handling.mode": "double",
  "key.converter.schemas.enable": "true",
  "name": "pg-barong-connector",
  "slot.name": "barong_connector",
  "snapshot.mode": "never",
  "table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users",
  "time.precision.mode": "connect",
  "transforms": "dropPrefix",
  "transforms.dropPrefix.regex": "pg.public.(.*)",
  "transforms.dropPrefix.replacement": "pg.$1",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "value.converter.schemas.enable": "true"
}
something like
"skipped.operations": "u,d,t"
Hà Hữu

Hà Hữu

10/09/2022, 4:48 PM
Hi @Jaromir Hamala
4:48 PM
i'm try to sync debezium to ILP questdb but i got an error about bigInterger
4:48 PM
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NoClassDefFoundError: sun/misc/FDBigInteger
	at io.questdb.std.Numbers.append(Numbers.java:2214)
	at io.questdb.std.Numbers.append(Numbers.java:255)
	at io.questdb.std.Numbers.append(Numbers.java:211)
	at io.questdb.std.str.CharSink.put(CharSink.java:109)
	at io.questdb.cutlass.line.AbstractLineSender.field(AbstractLineSender.java:145)
	at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:151)
	at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:46)
	at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:180)
	at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
	at io.questdb.kafka.QuestDBSinkTask.handleStruct(QuestDBSinkTask.java:87)
	at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:196)
	at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
	at io.questdb.kafka.QuestDBSinkTask.handleSingleRecord(QuestDBSinkTask.java:63)
	at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:50)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
Jaromir Hamala

Jaromir Hamala

10/09/2022, 4:50 PM
I see. that’s a known issue due Java version compatibility. there is a workaround. earlier today I sent you a zip with two JAR files:1. kafka-questdb-connector-0.1-SNAPSHOT.jar 2. questdb-6.5.2-jdk8.jar you uploaded these 2 JAR files into Kafka Connect. is that right?
Hà Hữu

Hà Hữu

10/09/2022, 4:51 PM
yes
4:51 PM
that right
Jaromir Hamala

Jaromir Hamala

10/09/2022, 4:52 PM
so delete questdb-6.5.2-jdk8.jar from your Kafka Connect and replace it with this:https://repo1.maven.org/maven2/org/questdb/questdb/6.5.3/questdb-6.5.3.jar
4:52 PM
then it should work
Hà Hữu

Hà Hữu

10/09/2022, 4:53 PM
i'll try it now
4:57 PM
hmm i got another error
4:57 PM
questdb_1               | 2022-10-09T16:56:07.279604Z E i.q.c.l.t.LineTcpConnectionContext [163] could not process line data [table=operations_liabilities, msg=cast error for line protocol integer [columnWriterIndex=3, columnType=STRING], errno=0]
questdb_1               | 2022-10-09T16:56:07.279613Z E i.q.c.l.t.LineTcpConnectionContext [163] could not parse measurement, NONE at 196, line (may be mangled due to partial parsing): 'operations_liabilities key_id=11i,id=11i,code=212i,member_id=1i,currency_id="bnb",debit=0.015,credit=0.0,reference_type="Withdraw",reference_id=12i,updated_at=1660746601670000t 1660746601670000000'
Jaromir Hamala

Jaromir Hamala

10/09/2022, 4:59 PM
is there anything before this?
4:59 PM
what is the schema of this table?
Hà Hữu

Hà Hữu

10/09/2022, 5:00 PM
no nothing
5:00 PM
just this error
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:01 PM
you have
memberId
as string, but it looks it’s Integer in Kafka/Postgres?
Hà Hữu

Hà Hữu

10/09/2022, 5:01 PM
OMG
5:01 PM
it's my bad
5:01 PM
:face_with_spiral_eyes:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:01 PM
haha, that I’m glad it’s not a problem with the connect 🙂
Hà Hữu

Hà Hữu

10/09/2022, 5:02 PM
ahh
5:02 PM
why ILP insert a key_id?
5:02 PM
😐
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:03 PM
it’s key from kafka message.
5:04 PM
would you like to filter it out? it should be possible via Kafka Connect Transform. setting the key to
null
should do it. but perhaps it’s a common use-case so I can add explicit support for this to the Quest ILP connector
Hà Hữu

Hà Hữu

10/09/2022, 5:08 PM
yes i think u should add it to connector but for now it's not a big problem for me
5:08 PM
ILP very fast OMG :face_with_spiral_eyes:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:10 PM
very cool! the connector will get even faster over time. I have done zero performance optimizations so far, I know some things can be improved. I’ll do it after the basic functionality is there.
5:19 PM
here is a new version of the connect. it supports a new config option:
include.key
when you set it to
false
then message keys won’t be include in questdb tables. see this for all options:https://github.com/questdb/kafka-questdb-connector#configuration
Hà Hữu

Hà Hữu

10/09/2022, 5:37 PM
i'll try it in tomorrow
5:37 PM
but i'm getting problem
5:37 PM
i know ilp very fast but why the consume group have some lag messagee?
5:38 PM
is there any batch.size config?
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:39 PM
Is the lag decreasing over time?
Hà Hữu

Hà Hữu

10/09/2022, 5:39 PM
yes
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:40 PM
Maybe there are out of order inserts? Is questdb busy? Anything about O3 in questdb log?
5:44 PM
Do you know if messages are kafka are out of order? You might want to try just 1 task. High level of parallelism in kafka might lead to our of order in questdb
Hà Hữu

Hà Hữu

10/09/2022, 5:46 PM
ok let me try :face_with_spiral_eyes:
Jaromir Hamala

Jaromir Hamala

10/09/2022, 5:51 PM
Maybe one task per table/topic would be best
Hà Hữu

Hà Hữu

10/09/2022, 5:54 PM
hmm i'll try with one task on topic one table for now i'm using one task but it still lag for my trading bot
5:54 PM
😒miling_face_with_tear: elasticsearch too
6:13 PM
hmm maybe the hardware problem elasticsearch one topic one index with 3 tasks still like that
6:14 PM
:face_with_spiral_eyes: anyway thanks you for this day
Jaromir Hamala

Jaromir Hamala

10/09/2022, 6:22 PM
You are very welcome!
6:23 PM
It helped me to understand requirements for ILP Connect sink better. So thank you for that.