Hà Hữu
10/06/2022, 4:10 AMmax txn-inflight limit reached [txn=27503, min=11119, size=16384]
while query
how can i fix it?
ThanksAndrey Pechkurov
10/06/2022, 6:38 AMHà Hữu
10/06/2022, 6:39 AMAndrey Pechkurov
10/06/2022, 6:42 AMHà Hữu
10/06/2022, 6:42 AMAndrey Pechkurov
10/06/2022, 6:45 AMHà Hữu
10/06/2022, 6:47 AMquestdb_1 | 2022-10-06T06:46:13.498904Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1 | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1 | at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1 | at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1 | at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1 | at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1 | at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1 | ]
Andrey Pechkurov
10/06/2022, 7:20 AMHà Hữu
10/06/2022, 7:25 AMAndrey Pechkurov
10/06/2022, 7:51 AMbecause i need to use limit...then it may be possible to re-write the query to work around the bug
Nicolas Hourcard
10/06/2022, 8:19 AMJaromir Hamala
10/06/2022, 8:20 AMAndrey Pechkurov
10/06/2022, 10:14 AMHà Hữu
10/06/2022, 10:31 AMCREATE TABLE trades (
id INT,
market_id STRING,
price DOUBLE,
amount DOUBLE,
total DOUBLE,
maker_order_id INT,
taker_order_id INT,
maker_id INT,
taker_id INT,
side STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
TIMESTAMP(created_at)
PARTITION BY DAY;
Jaromir Hamala
10/06/2022, 10:57 AMHà Hữu
10/06/2022, 10:58 AMJaromir Hamala
10/06/2022, 10:58 AMHà Hữu
10/06/2022, 10:58 AMSELECT * FROM (
SELECT
FIRST(price) AS open,
max(price) AS high,
min(price) AS low,
last(price) AS close,
sum(amount) AS volume,
created_at as timestamp
FROM
trades
WHERE
market_id = 'btcusdt' AND created_at > dateadd('m', 60, systimestamp())
SAMPLE BY 60m
FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
) ORDER BY timestamp DESC LIMIT 0, 1
questdb_1 | 2022-10-06T11:18:41.962005Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1 | SELECT * FROM (
questdb_1 | SELECT
questdb_1 | FIRST(price) AS open,
questdb_1 | max(price) AS high,
questdb_1 | min(price) AS low,
questdb_1 | last(price) AS close,
questdb_1 | sum(amount) AS volume,
questdb_1 | created_at as timestamp
questdb_1 | FROM
questdb_1 | trades
questdb_1 | WHERE
questdb_1 | market_id = 'ltcusdt' AND created_at > dateadd('m', -1440, systimestamp())
questdb_1 | SAMPLE BY 1440m
questdb_1 | FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1 | ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1 | 2022-10-06T11:18:41.962020Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1 | 2022-10-06T11:18:41.962022Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_4]
questdb_1 | 2022-10-06T11:18:41.962099Z I i.q.c.p.PGConnectionContext query cache used [fd=8]
questdb_1 | 2022-10-06T11:18:41.962602Z I i.q.c.p.PGConnectionContext parse [fd=8, q=
questdb_1 | SELECT * FROM (
questdb_1 | SELECT
questdb_1 | FIRST(price) AS open,
questdb_1 | max(price) AS high,
questdb_1 | min(price) AS low,
questdb_1 | last(price) AS close,
questdb_1 | sum(amount) AS volume,
questdb_1 | created_at as timestamp
questdb_1 | FROM
questdb_1 | trades
questdb_1 | WHERE
questdb_1 | market_id = 'ltcusdt' AND created_at > dateadd('m', -4320, systimestamp())
questdb_1 | SAMPLE BY 4320m
questdb_1 | FILL(NULL, NULL, NULL, NULL, 0) ALIGN TO CALENDAR
questdb_1 | ) ORDER BY timestamp DESC LIMIT -1, 0]
questdb_1 | 2022-10-06T11:18:41.962827Z I i.q.g.SqlCompiler plan [q=`select-choose open, high, low, close, volume, timestamp from (select-group-by [FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp] FIRST(price) open, max(price) high, min(price) low, last(price) close, sum(amount) volume, timestamp from (select-choose [price, amount, created_at timestamp] price, amount, created_at timestamp from (select [price, amount, created_at, market_id] from trades timestamp (created_at) where market_id = 'ltcusdt' and created_at > dateadd('m',-(4320),systimestamp()))) sample by 4320m fill(NULL,NULL,NULL,NULL,0) align to calendar with offset '00:00') order by timestamp desc limit -(1),0`, fd=8]
questdb_1 | 2022-10-06T11:18:41.963100Z I i.q.c.p.PGConnectionContext prepare [name=lrupsc_31_5]
questdb_1 | 2022-10-06T11:18:41.963159Z C i.q.c.p.PGWireServer internal error [ex=
questdb_1 | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.parse(PGConnectionContext.java:1570)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.handleClientOperation(PGConnectionContext.java:415)
questdb_1 | at io.questdb.cutlass.pgwire.PGJobContext.handleClientOperation(PGJobContext.java:84)
questdb_1 | at io.questdb.cutlass.pgwire.PGWireServer$1.lambda$$0(PGWireServer.java:85)
questdb_1 | at io.questdb.network.AbstractIODispatcher.processIOQueue(AbstractIODispatcher.java:181)
questdb_1 | at io.questdb.cutlass.pgwire.PGWireServer$1.run(PGWireServer.java:112)
questdb_1 | at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1 | ]
questdb_1 | 2022-10-06T11:18:41.963210Z I pg-server scheduling disconnect [fd=8, reason=17]
questdb_1 | 2022-10-06T11:18:41.963235Z I pg-server disconnected [ip=172.29.0.31, fd=8, src=queue]
questdb_1 | 2022-10-06T11:18:41.963323Z C server-main unhandled error [job=io.questdb.network.IODispatcherLinux@3ce3db41, ex=
questdb_1 | java.lang.NullPointerException: Cannot invoke "io.questdb.griffin.engine.orderby.RecordTreeChain$TreeCursor.close()" because "this.chainCursor" is null
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursor.close(SortedRecordCursor.java:44)
questdb_1 | at io.questdb.griffin.engine.orderby.SortedRecordCursorFactory._close(SortedRecordCursorFactory.java:65)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.griffin.engine.LimitRecordCursorFactory._close(LimitRecordCursorFactory.java:62)
questdb_1 | at io.questdb.cairo.AbstractRecordCursorFactory.close(AbstractRecordCursorFactory.java:50)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.cutlass.pgwire.TypesAndSelect.close(TypesAndSelect.java:42)
questdb_1 | at io.questdb.std.Misc.free(Misc.java:46)
questdb_1 | at io.questdb.std.AssociativeCache.put(AssociativeCache.java:137)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.clearCursorAndFactory(PGConnectionContext.java:1056)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewBatchQuery(PGConnectionContext.java:1707)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.prepareForNewQuery(PGConnectionContext.java:1718)
questdb_1 | at io.questdb.cutlass.pgwire.PGConnectionContext.clear(PGConnectionContext.java:301)
questdb_1 | at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:54)
questdb_1 | at io.questdb.std.WeakMutableObjectPool.clear(WeakMutableObjectPool.java:31)
questdb_1 | at io.questdb.std.WeakObjectPoolBase.push(WeakObjectPoolBase.java:63)
questdb_1 | at io.questdb.std.WeakMutableObjectPool.push(WeakMutableObjectPool.java:42)
questdb_1 | at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:62)
questdb_1 | at io.questdb.network.MutableIOContextFactory.done(MutableIOContextFactory.java:35)
questdb_1 | at io.questdb.network.AbstractIODispatcher.doDisconnect(AbstractIODispatcher.java:289)
questdb_1 | at io.questdb.network.AbstractIODispatcher.disconnectContext(AbstractIODispatcher.java:274)
questdb_1 | at io.questdb.mp.SCSequence.consumeAll(SCSequence.java:73)
questdb_1 | at io.questdb.network.AbstractIODispatcher.processDisconnects(AbstractIODispatcher.java:309)
questdb_1 | at io.questdb.network.IODispatcherLinux.runSerially(IODispatcherLinux.java:119)
questdb_1 | at io.questdb.mp.SynchronizedJob.run(SynchronizedJob.java:39)
questdb_1 | at io.questdb.mp.Worker.run(Worker.java:118)
questdb_1 | ]
questdb_1 | 2022-10-06T11:19:05.834692Z I i.q.c.h.p.StaticContentProcessor [372] incoming [url=/assets/vs/base/worker/workerMain.js]
Andrey Pechkurov
10/06/2022, 11:33 AMHà Hữu
10/06/2022, 11:36 AMAndrey Pechkurov
10/06/2022, 11:37 AMdateadd('m', 60, systimestamp())
should be dateadd('m', -60, systimestamp())
unless you have data from the futureHà Hữu
10/06/2022, 11:38 AMAndrey Pechkurov
10/06/2022, 11:39 AMdateadd('m', -60, systimestamp())
gives you the past timestamp of -60 minutes from the current timedateadd('m', 60, systimestamp())
is a future timestampHà Hữu
10/06/2022, 11:43 AMAndrey Pechkurov
10/06/2022, 11:50 AMHà Hữu
10/06/2022, 11:59 AMAndrey Pechkurov
10/06/2022, 12:48 PMHà Hữu
10/06/2022, 12:49 PMAndrey Pechkurov
10/06/2022, 12:50 PMJaromir Hamala
10/06/2022, 12:51 PMHà Hữu
10/06/2022, 12:52 PMtrades
with column price
i can build kline from it using SELECT FIRST(price) as open, MIN(price) as low, MAX(price) as high, LAST(price) as close FROM trades SAMPLE BY 15m FILL(PREV, PREV, PREV, PREV);
but if in 1 hour I don't have any trades
some row will filled by PREV
it's good but in kline
PREV of open, close, high, low must be FILL BY PREV of last
if data is emptyAndrey Pechkurov
10/06/2022, 3:40 PMPei
10/07/2022, 1:23 PMHà Hữu
10/09/2022, 1:38 AMAndrey Pechkurov
10/09/2022, 6:13 AMcan i get the branch ?Not sure if I got the question. Could you elaborate?
Hà Hữu
10/09/2022, 7:20 AMJaromir Hamala
10/09/2022, 7:56 AMHà Hữu
10/09/2022, 8:06 AMJaromir Hamala
10/09/2022, 8:07 AMgit clone <https://github.com/questdb/kafka-questdb-connector.git>
Hà Hữu
10/09/2022, 8:12 AMJaromir Hamala
10/09/2022, 8:13 AMHà Hữu
10/09/2022, 8:15 AMJaromir Hamala
10/09/2022, 8:17 AMHà Hữu
10/09/2022, 8:17 AM{
"auto.create": "false",
"auto.evolve": "false",
"connection.password": "quest",
"connection.url": "jdbc:<postgresql://questdb:8812/qdb?useSSL=false>",
"connection.user": "admin",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"dialect.name": "PostgreSqlDatabaseDialect",
"insert.mode": "insert",
"key.converter.schemas.enable": "true",
"name": "questdb_jdbc_sink",
"pk.mode": "none",
"tasks.max": "20",
"topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
"transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
"transforms.addPrefix.regex": ".*",
"transforms.addPrefix.replacement": "$0",
"transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.createdAtTimestampConverter.field": "created_at",
"transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.createdAtTimestampConverter.target.type": "string",
"transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.dropPrefix.regex": "pg.(.*)",
"transforms.dropPrefix.replacement": "$1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.updatedAtTimestampConverter.field": "updated_at",
"transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.updatedAtTimestampConverter.target.type": "string",
"transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"value.converter.schemas.enable": "true"
}
Jaromir Hamala
10/09/2022, 8:21 AM{
"host": "quest",
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"key.converter.schemas.enable": "true",
"name": "questdb_ilp_sink",
"tasks.max": "1",
"topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
"transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
"transforms.addPrefix.regex": ".*",
"transforms.addPrefix.replacement": "$0",
"transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.createdAtTimestampConverter.field": "created_at",
"transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.createdAtTimestampConverter.target.type": "string",
"transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.dropPrefix.regex": "pg.(.*)",
"transforms.dropPrefix.replacement": "$1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.updatedAtTimestampConverter.field": "updated_at",
"transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.updatedAtTimestampConverter.target.type": "string",
"transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"value.converter.schemas.enable": "true"
}
Hà Hữu
10/09/2022, 8:22 AMJaromir Hamala
10/09/2022, 8:22 AMHà Hữu
10/09/2022, 8:24 AMJaromir Hamala
10/09/2022, 8:24 AMHà Hữu
10/09/2022, 8:24 AMJaromir Hamala
10/09/2022, 8:28 AMHà Hữu
10/09/2022, 8:29 AMJaromir Hamala
10/09/2022, 8:29 AMHà Hữu
10/09/2022, 8:31 AMJaromir Hamala
10/09/2022, 8:33 AMcurl <kafka_connect_host>:8083/connectors/
Hà Hữu
10/09/2022, 8:34 AMJaromir Hamala
10/09/2022, 8:35 AMcurl <kafka_connect_host>:8083/connectors
?Hà Hữu
10/09/2022, 8:35 AMJaromir Hamala
10/09/2022, 8:37 AMcurl localhost:8083/connectors/questdb_ilp_sink/status
show tables;
in QuestDB SQL Console?Hà Hữu
10/09/2022, 8:39 AMJaromir Hamala
10/09/2022, 8:41 AMHà Hữu
10/09/2022, 8:41 AMJaromir Hamala
10/09/2022, 8:41 AMHà Hữu
10/09/2022, 8:41 AMJaromir Hamala
10/09/2022, 8:42 AMHà Hữu
10/09/2022, 8:46 AMJaromir Hamala
10/09/2022, 8:46 AMHà Hữu
10/09/2022, 8:46 AMorg.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [32] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:52)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
Jaromir Hamala
10/09/2022, 8:52 AMHà Hữu
10/09/2022, 8:52 AMJaromir Hamala
10/09/2022, 8:53 AMHà Hữu
10/09/2022, 8:53 AM2022-10-09T08:47:08.614483Z E i.q.c.l.t.LineTcpConnectionContext [7] could not process line data [table=activities, msg=cast error for line protocol string [columnWriterIndex=13, columnType=TIMESTAMP], errno=0]
questdb_1 | 2022-10-09T08:47:08.614492Z E i.q.c.l.t.LineTcpConnectionContext [7] could not parse measurement, NONE at 374, line (may be mangled due to partial parsing): 'activities key_id=10484252i,id=10484252i,user_id=5i,category="user",user_ip="58.186.164.249",user_ip_country="VN",user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",topic="session",action="login",result="succeed",device="fake",created_at="2022-08-13 15:53:54.095",updated_at="2022-08-13 15:53:54.095"'
questdb_1 | 2022-10-09T08:47:08.614504Z I tcp-line-server scheduling disconnect [fd=7, reason=0]
Jaromir Hamala
10/09/2022, 8:55 AMHà Hữu
10/09/2022, 8:57 AMJaromir Hamala
10/09/2022, 8:57 AMcreated_at
& updated_at
as strings, even they should be timestamp. that should be solvable via transforms.Hà Hữu
10/09/2022, 8:58 AMJaromir Hamala
10/09/2022, 9:03 AMcurl -X DELETE localhost:8083/connectors/questdb_ilp_sink
2. then submit it with a different configuration:
{
"host": "quest",
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"key.converter.schemas.enable": "true",
"name": "questdb_ilp_sink",
"tasks.max": "1",
"topics": "pg.trades, pg.activities,pg.operations_assets,pg.operations_expenses,pg.operations_liabilities,pg.operations_revenues",
"transforms": "unwrap,dropPrefix, addPrefix, createdAtTimestampConverter, updatedAtTimestampConverter",
"transforms.addPrefix.regex": ".*",
"transforms.addPrefix.replacement": "$0",
"transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.createdAtTimestampConverter.field": "created_at",
"transforms.createdAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.createdAtTimestampConverter.target.type": "unix",
"transforms.createdAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.dropPrefix.regex": "pg.(.*)",
"transforms.dropPrefix.replacement": "$1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.updatedAtTimestampConverter.field": "updated_at",
"transforms.updatedAtTimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.updatedAtTimestampConverter.target.type": "Timestamp",
"transforms.updatedAtTimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"value.converter.schemas.enable": "true",
"timestamp.field.name": "created_at"
}
Hà Hữu
10/09/2022, 9:03 AMJaromir Hamala
10/09/2022, 9:06 AM"timestamp.field.name": "created_at"
in connector configuration then the date should be OKHà Hữu
10/09/2022, 9:07 AMJaromir Hamala
10/09/2022, 9:07 AMHà Hữu
10/09/2022, 9:07 AMJaromir Hamala
10/09/2022, 9:07 AMHà Hữu
10/09/2022, 9:12 AMJaromir Hamala
10/09/2022, 9:12 AMHà Hữu
10/09/2022, 9:13 AMJaromir Hamala
10/09/2022, 9:15 AMHà Hữu
10/09/2022, 9:18 AMJaromir Hamala
10/09/2022, 9:18 AMHà Hữu
10/09/2022, 9:19 AMJaromir Hamala
10/09/2022, 9:19 AMHà Hữu
10/09/2022, 9:20 AMJaromir Hamala
10/09/2022, 9:21 AMHà Hữu
10/09/2022, 9:26 AMJaromir Hamala
10/09/2022, 9:27 AMHà Hữu
10/09/2022, 9:36 AM{
"column.exclude.list": "public.users.password_digest,public.users.data",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "barong_production",
"database.hostname": "db",
"database.password": "zsmartex",
"database.server.name": "pg",
"database.user": "zsmartex",
"decimal.handling.mode": "double",
"key.converter.schemas.enable": "true",
"name": "pg-barong-connector",
"slot.name": "barong_connector",
"snapshot.mode": "never",
"table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users",
"time.precision.mode": "connect",
"transforms": "dropPrefix",
"transforms.dropPrefix.regex": "pg.public.(.*)",
"transforms.dropPrefix.replacement": "pg.$1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"value.converter.schemas.enable": "true"
}
Jaromir Hamala
10/09/2022, 9:40 AMop
c = create
u = update
d = delete
r = read (applies to only snapshots)
t = truncate
m = message
so you would use filter to get only message where op == 'c'
Hà Hữu
10/09/2022, 10:14 AMJaromir Hamala
10/09/2022, 10:55 AMskipped.operations
see https://debezium.io/documentation/reference/stable/connectors/postgresql.html and search for skipped.operations
{
"column.exclude.list": "public.users.password_digest,public.users.data",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "barong_production",
"database.hostname": "db",
"database.password": "zsmartex",
"database.server.name": "pg",
"database.user": "zsmartex",
"decimal.handling.mode": "double",
"key.converter.schemas.enable": "true",
"name": "pg-barong-connector",
"slot.name": "barong_connector",
"snapshot.mode": "never",
"table.include.list": "public.activities,public.api_keys,public.profiles,public.attachments,public.users",
"time.precision.mode": "connect",
"transforms": "dropPrefix",
"transforms.dropPrefix.regex": "pg.public.(.*)",
"transforms.dropPrefix.replacement": "pg.$1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"value.converter.schemas.enable": "true"
}
something like "skipped.operations": "u,d,t"
Hà Hữu
10/09/2022, 4:48 PMorg.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NoClassDefFoundError: sun/misc/FDBigInteger
at io.questdb.std.Numbers.append(Numbers.java:2214)
at io.questdb.std.Numbers.append(Numbers.java:255)
at io.questdb.std.Numbers.append(Numbers.java:211)
at io.questdb.std.str.CharSink.put(CharSink.java:109)
at io.questdb.cutlass.line.AbstractLineSender.field(AbstractLineSender.java:145)
at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:151)
at io.questdb.cutlass.line.AbstractLineSender.doubleColumn(AbstractLineSender.java:46)
at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:180)
at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
at io.questdb.kafka.QuestDBSinkTask.handleStruct(QuestDBSinkTask.java:87)
at io.questdb.kafka.QuestDBSinkTask.tryWritePhysicalTypeFromSchema(QuestDBSinkTask.java:196)
at io.questdb.kafka.QuestDBSinkTask.handleObject(QuestDBSinkTask.java:118)
at io.questdb.kafka.QuestDBSinkTask.handleSingleRecord(QuestDBSinkTask.java:63)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:50)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
Jaromir Hamala
10/09/2022, 4:50 PMHà Hữu
10/09/2022, 4:51 PMJaromir Hamala
10/09/2022, 4:52 PMHà Hữu
10/09/2022, 4:53 PMquestdb_1 | 2022-10-09T16:56:07.279604Z E i.q.c.l.t.LineTcpConnectionContext [163] could not process line data [table=operations_liabilities, msg=cast error for line protocol integer [columnWriterIndex=3, columnType=STRING], errno=0]
questdb_1 | 2022-10-09T16:56:07.279613Z E i.q.c.l.t.LineTcpConnectionContext [163] could not parse measurement, NONE at 196, line (may be mangled due to partial parsing): 'operations_liabilities key_id=11i,id=11i,code=212i,member_id=1i,currency_id="bnb",debit=0.015,credit=0.0,reference_type="Withdraw",reference_id=12i,updated_at=1660746601670000t 1660746601670000000'
Jaromir Hamala
10/09/2022, 4:59 PMHà Hữu
10/09/2022, 5:00 PMJaromir Hamala
10/09/2022, 5:01 PMmemberId
as string, but it looks it’s Integer in Kafka/Postgres?Hà Hữu
10/09/2022, 5:01 PMJaromir Hamala
10/09/2022, 5:01 PMHà Hữu
10/09/2022, 5:02 PMJaromir Hamala
10/09/2022, 5:03 PMnull
should do it. but perhaps it’s a common use-case so I can add explicit support for this to the Quest ILP connectorHà Hữu
10/09/2022, 5:08 PMJaromir Hamala
10/09/2022, 5:10 PMinclude.key
when you set it to false
then message keys won’t be include in questdb tables.
see this for all options:
https://github.com/questdb/kafka-questdb-connector#configurationHà Hữu
10/09/2022, 5:37 PMJaromir Hamala
10/09/2022, 5:39 PMHà Hữu
10/09/2022, 5:39 PMJaromir Hamala
10/09/2022, 5:40 PMHà Hữu
10/09/2022, 5:46 PMJaromir Hamala
10/09/2022, 5:51 PMHà Hữu
10/09/2022, 5:54 PMJaromir Hamala
10/09/2022, 6:22 PM