John Lewis
03/04/2023, 1:51 PMHolger
03/04/2023, 2:34 PMNicolas Hourcard
03/04/2023, 2:54 PMImre
03/04/2023, 3:42 PM// load data as dataframe
from pyspark.sql import SparkSession
query = "SELECT symbol, sum(amount) as volume, round((max(price)+min(price))/2, 2) as mid, timestamp as ts " \
"FROM trades WHERE symbol = 'BTC-USD' " \
"SAMPLE BY 1m ALIGN to CALENDAR"
spark = SparkSession.builder.appName("spark_questdb") \
.config("spark.jars", "postgresql-42.5.1.jar") \
.getOrCreate()
df = spark.read.format("jdbc").option("url", "jdbc:<postgresql://localhost:8812/questdb>") \
.option("driver", "org.postgresql.Driver").option("user", "admin").option("password", "quest") \
.option("query", query).load()
// add 30-min moving average as a new column
from pyspark.sql.window import Window
window30 = Window.rowsBetween(-30, Window.currentRow)
df = df.withColumn("ma30", avg(df.mid).over(window30))
df.show(3, False)
+-------+------------------+------------------+-------------------+------------------+
|symbol |volume |mid |ts |ma30 |
+-------+------------------+------------------+-------------------+------------------+
|BTC-USD|28.879045500000096|23133.29 |2023-02-01 00:00:00|23133.29 |
|BTC-USD|28.832535900000032|23130.18 |2023-02-01 00:01:00|23131.735 |
|BTC-USD|44.71088651999993 |23136.61 |2023-02-01 00:02:00|23133.36 |
John Lewis
03/05/2023, 9:39 AM