javier ramirez
11/10/2022, 4:36 PMFranco Almonacid
11/10/2022, 4:45 PMjavier ramirez
11/10/2022, 4:51 PMFranco Almonacid
11/10/2022, 4:54 PMFranco Almonacid
11/10/2022, 5:17 PMFranco Almonacid
11/10/2022, 5:18 PM{'e': 'bookTicker', 'u': 2140784669005, 's': 'ETHUSDT', 'b': '1315.26', 'B': '7.192', 'a': '1315.27', 'A': '0.846', 'T': 1668100270152, 'E': 1668100270157}
{'e': 'bookTicker', 'u': 2140784669134, 's': 'SOLUSDT', 'b': '17.2260', 'B': '13', 'a': '17.2300', 'A': '51', 'T': 1668100270154, 'E': 1668100270158}
{'e': 'bookTicker', 'u': 2140784669073, 's': 'ETHUSDT', 'b': '1315.26', 'B': '7.733', 'a': '1315.27', 'A': '0.846', 'T': 1668100270153, 'E': 1668100270157}
{'e': 'bookTicker', 'u': 2140784669097, 's': 'ETHUSDT', 'b': '1315.26', 'B': '7.733', 'a': '1315.28', 'A': '5.747', 'T': 1668100270153, 'E': 1668100270157}
{'e': 'bookTicker', 'u': 2140784669102, 's': 'ETHUSDT', 'b': '1315.26', 'B': '8.274', 'a': '1315.28', 'A': '5.747', 'T': 1668100270153, 'E': 1668100270158}
{'e': 'bookTicker', 'u': 2140784669156, 's': 'ETHUSDT', 'b': '1315.26', 'B': '8.274', 'a': '1315.27', 'A': '4.480', 'T': 1668100270154, 'E': 1668100270158}
{'e': 'bookTicker', 'u': 2140784669157, 's': 'ETHUSDT', 'b': '1315.26', 'B': '7.733', 'a': '1315.27', 'A': '4.480', 'T': 1668100270154, 'E': 1668100270158}
{'e': 'bookTicker', 'u': 2140784669195, 's': 'ETHUSDT', 'b': '1315.26', 'B': '8.352', 'a': '1315.27', 'A': '4.480', 'T': 1668100270155, 'E': 1668100270158}
Franco Almonacid
11/10/2022, 5:18 PME
fieldFranco Almonacid
11/10/2022, 5:19 PMSOLUSDT
has a different timestampFranco Almonacid
11/10/2022, 5:20 PMjavier ramirez
11/10/2022, 5:23 PMSebastián Torrealba
11/10/2022, 5:43 PMjavier ramirez
11/10/2022, 5:47 PMVlad
11/11/2022, 9:38 AMVlad
11/11/2022, 9:39 AMSo any recommendation, how about store each symbol (ordered) in a different table?table name is in your publishing code, just change that i guess?
Franco Almonacid
11/11/2022, 11:09 AMVlad
11/11/2022, 7:20 PMJiri Pokorny
11/14/2022, 12:24 PM/tmp/cairo/writer/table/2022-11-12T10.40773157
^^^
Shubham Jain
11/20/2022, 6:26 AM{"level":"info","msg":"inserting 15375 candles..."}
{"level":"info","msg":"inserted 15375candles in1.783808583s"}
{"level":"info","msg":"flushed in 56.125µs"}
• My server config - m6a.xlarge
• using GP3 SSD with 3000 IOPS and confirmed only ~350 write ops are taking/sec so I am assuming its not I/O throttled
• commit lag is set to server default
• Using questDB official AMI
for this degraded performance could writing 15k candles and then flushing it be the reason ? If so then flushing for each candle data wont that also introduce lag due to individual ack or am i missing something else here ?Suren Markosov
11/24/2022, 1:04 AMFurkan Küçük
12/05/2022, 3:02 PM{
"name": "questdb-sink",
"config": {
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"host": "localhost:9009",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
}
}
I am sending the config via this command:
curl -X POST -H "
Accept:application/json" -H "Content-Type:application/json" --data @sink_conf.json http
://localhost:8083/connectors
This gives me the following error:
{
"error_code": 500,
"message": "Unexpected character ('}' (code 125)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 276] (through reference chain: org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest[\"config\"])"
}
Couldn't understand where my fault is. Any help is appreciated.Furkan Küçük
12/06/2022, 7:30 AMFROM ubuntu:latest AS builder
WORKDIR /opt
RUN apt-get update && apt-get install -y curl wget unzip jq
RUN curl -s <https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest> | jq -r '.assets[]|select(.content_type == "application/zip")|.browser_download_url'|wget -qi -
RUN unzip kafka-questdb-connector-*-bin.zip
FROM confluentinc/cp-kafka-connect:7.2.2
COPY --from=builder /opt/kafka-questdb-connector/*.jar /kafka/connect/questdb-connector/
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.5.4
USER 1001
However, I am getting this error:
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.questdb.kafka.QuestDBSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='10.5.4', encodedVersion=10.5.4, type=sink, typeName='sink', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='10.5.4', encodedVersion=10.5.4, type=source, typeName='source', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='7.2.2-ccs', encodedVersion=7.2.2-ccs, type=source, typeName='so
Furkan Küçük
12/09/2022, 9:53 PM{
"name": "questdb-sink",
"config": {
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"topics": "book,candles,ticker,trades",
"host": "crypto_questdb",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
With this, Kafka connect actually connects, gets only one message from Kafka and throws the exception below:
[2022-12-09 21:43:47,413] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: [104] send error (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [104] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,421] WARN WorkerSinkTask{id=questdb-sink-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2022-12-09 21:43:47,421] ERROR WorkerSinkTask{id=questdb-sink-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.flush(QuestDBSinkTask.java:284)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:646)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closeAllPartitions(WorkerSinkTask.java:641)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,422] ERROR WorkerSinkTask{id=questdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.questdb.cutlass.line.LineSenderException: [104] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.kafka.QuestDBSinkTask.put(QuestDBSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
[2022-12-09 21:43:47,423] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.questdb.cutlass.line.LineSenderException: [32] send error
at io.questdb.cutlass.line.tcp.PlainTcpLineChannel.send(PlainTcpLineChannel.java:100)
at io.questdb.cutlass.line.AbstractLineSender.sendAll(AbstractLineSender.java:380)
at io.questdb.cutlass.line.LineTcpSender.flush(LineTcpSender.java:81)
at io.questdb.cutlass.line.AbstractLineSender.close(AbstractLineSender.java:111)
at io.questdb.kafka.QuestDBSinkTask.stop(QuestDBSinkTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:167)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-12-09 21:43:47,426] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Revoke previously assigned partitions book-0, candles-0, ticker-0, trades-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-12-09 21:43:47,427] INFO [Consumer clientId=connector-consumer-questdb-sink-0, groupId=connect-questdb-sink] Member connector-consumer-questdb-sink-0-40875dda-9812-476c-8617-52a20314b30f sending LeaveGroup request to coordinator SOME_IP:SOME_PORT (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Furkan Küçük
12/12/2022, 9:37 AM{
"exchange": "BINANCE_FUTURES",
"symbol": "MATIC-USDT-PERP",
"delta": {
"bid": [
[
0.8843,
60015
],
[
0.8844,
54558
],
[
0.8846,
29971
]
],
"ask": [
[
0.8849,
0
],
[
0.8852,
56019
],
[
0.8853,
53812
],
[
0.8863,
73312
],
[
0.8865,
95046
]
]
},
"timestamp": 1670837182.905,
"receipt_timestamp": 1670837183.024427
}
sam
12/13/2022, 10:35 PMsam
12/13/2022, 10:35 PMdf = pd.read_csv(latest_file).reindex()
##print(df)
#
#
def insert_row(host: str = '192.168.1.15', port: int = 9009):
table_name: str = str(uuid.uuid1())
watermark = 1024 # Flush if the internal buffer exceeds 1KiB
with Sender(host=host, port=port, auto_flush=watermark) as sender:
total_rows = 0
last_flush = time.monotonic()
print('Inserting row...')
for index, row in df.iterrows():
# Print the index and values of each row
#print(f"Row {index}: {row}")
print(row.to_dict())
sender.row(
'hist-ohlc-daily', ####TABLE NAME
columns=row.to_dict())
total_rows += 1
# If the internal buffer is empty, then auto-flush triggered.
if len(sender) == 0:
print('Auto-flush triggered.')
last_flush = time.monotonic()
# Flush at least once every five seconds.
if time.monotonic() - last_flush > 30:
print('Timer-flushing triggered.')
sender.flush()
last_flush = time.monotonic()
print(f"table: {table_name}, total rows sent: {total_rows}")
print("(wait commitLag for all rows to be available)")
print("bye!")
if __name__ == '__main__':
insert_row()
sam
12/13/2022, 10:36 PMError: File "src/questdb/ingress.pyx", line 1289, in questdb.ingress.Sender.row
File "src/questdb/ingress.pyx", line 764, in questdb.ingress.Buffer.row
File "src/questdb/ingress.pyx", line 667, in questdb.ingress.Buffer._row
File "src/questdb/ingress.pyx", line 602, in questdb.ingress.Buffer._may_trigger_row_complete
File "src/questdb/ingress.pyx", line 341, in questdb.ingress.may_flush_on_row_complete
File "src/questdb/ingress.pyx", line 1340, in questdb.ingress.Sender.flush
File "src/questdb/ingress.pyx", line 1330, in questdb.ingress.Sender.flush
questdb.ingress.IngressError: Could not flush buffer: Broken pipe (os error 32) - See <https://py-questdb-client.readthedocs.io/en/v1.0.2/troubleshooting.html#inspecting-and-debugging-errors#flush-failed>
sam
12/13/2022, 10:37 PMCREATE TABLE 'hist-ohlc-daily' (
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume INT,
datetime timestamp,
symbol STRING
);
sam
12/15/2022, 7:05 AMlatest by over a table requires designated TIMESTAMP
sam
12/15/2022, 7:05 AMCREATE TABLE 'hist-ohlc-daily-stg' (
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume INT,
datetime timestamp,
symbol STRING
);
sam
12/15/2022, 7:21 AMsam
12/15/2022, 7:21 AM