cosmin
01/14/2023, 11:02 PMImre
01/16/2023, 1:27 AMasync def write(self, data):
d = self.format(data)
timestamp = data["timestamp"]
received_timestamp_int = int(data["receipt_timestamp"] * 1_000_000)
timestamp_int = int(timestamp * 1_000_000_000) if timestamp is not None else received_timestamp_int * 1000
update = f'{self.key}-{data["exchange"]},symbol={data["symbol"]} {d},receipt_timestamp={received_timestamp_int}t {timestamp_int}'
await self.queue.put(update)
Adam Cimarosti
01/16/2023, 7:46 AMquestdb.ingress.Buffer
object on its own.Imre
01/16/2023, 9:57 AMcosmin
01/16/2023, 10:55 AMImre
01/16/2023, 12:06 PMmultiprocessing
on.
it might helps, assuming you are subscribed to multiple market data feeds.
multiprocessing
is off by default (backend.py):
class BackendQueue:
def start(self, loop: asyncio.AbstractEventLoop, multiprocess=False):
if hasattr(self, 'started') and self.started:
# prevent a backend callback from starting more than 1 writer and creating more than 1 queue
return
self.multiprocess = multiprocess
if self.multiprocess:
self.queue = Pipe(duplex=False)
self.worker = Process(target=BackendQueue.worker, args=(self.writer,), daemon=True)
self.worker.start()
else:
self.queue = Queue()
self.worker = loop.create_task(self.writer())
self.started = True
cryptofeed
to another instance might be a good idea too, especially if there are no free cores for the exchange connections.
it depends on the number of processes will be started by cryptofeed, in other words the number of data feeds.class QuestCallback(SocketCallback):
def __init__(self, host='127.0.0.1', port=9009, key=None, **kwargs):
super().__init__(f"tcp://{host}", port=port, **kwargs)
cosmin
01/16/2023, 10:26 PM