1
0
Fork 0

Track aggregates and expose via API

main
Kevin Belisle 2021-07-09 16:28:48 -04:00
parent 4c9e819ef9
commit 6512f3ca41
4 changed files with 142 additions and 46 deletions

View File

@ -8,6 +8,7 @@ import kotlinx.serialization.*
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.* import org.jetbrains.exposed.sql.transactions.*
import xyz.etztech.stonks.dsl.AggregateStatistics
import xyz.etztech.stonks.dsl.LiveStatistics import xyz.etztech.stonks.dsl.LiveStatistics
import xyz.etztech.stonks.dsl.Players import xyz.etztech.stonks.dsl.Players
import xyz.etztech.stonks.dsl.Statistics import xyz.etztech.stonks.dsl.Statistics
@ -173,6 +174,34 @@ fun initApiServer(apiServerPort: Int, database: Database) {
} }
} }
app.get("api/aggregates") { ctx ->
run {
transaction(database) {
addLogger(StdOutSqlLogger)
val maxValueExpr = AggregateStatistics.value.max()
val aggregates =
AggregateStatistics.slice(
AggregateStatistics.type,
AggregateStatistics.name,
maxValueExpr
)
.selectAll()
.groupBy(AggregateStatistics.type, AggregateStatistics.name)
.map {
AggregateValue(
it[AggregateStatistics.type],
it[AggregateStatistics.name],
it[maxValueExpr]!!
)
}
ctx.result(Json { prettyPrint = true }.encodeToString(aggregates))
}
}
}
app.get("api/*") { ctx -> ctx.status(404) } app.get("api/*") { ctx -> ctx.status(404) }
println("Javalin web server started") println("Javalin web server started")
@ -199,3 +228,13 @@ data class HistoricalStatisticValue(
val timestamp: String, val timestamp: String,
val value: Long val value: Long
) )
@Serializable data class AggregateValue(val type: String, val name: String, val value: Long)
@Serializable
data class HistoricalAggregateValue(
val type: String,
val name: String,
val timestamp: String,
val value: Long
)

View File

@ -10,13 +10,13 @@ import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
import xyz.etztech.stonks.api.initApiServer import xyz.etztech.stonks.api.initApiServer
import xyz.etztech.stonks.dsl.AggregateStatistics
import xyz.etztech.stonks.dsl.LiveStatistics import xyz.etztech.stonks.dsl.LiveStatistics
import xyz.etztech.stonks.dsl.Players import xyz.etztech.stonks.dsl.Players
import xyz.etztech.stonks.dsl.Statistics import xyz.etztech.stonks.dsl.Statistics
import xyz.etztech.stonks.statisticsimporter.StatisticsImporter import xyz.etztech.stonks.statisticsimporter.StatisticsImporter
fun main() = fun main() = runBlocking {
runBlocking {
println("Starting Stonks...") println("Starting Stonks...")
val fis = FileInputStream("./stonks.config") val fis = FileInputStream("./stonks.config")
@ -39,7 +39,8 @@ fun main() =
databaseName, databaseName,
h2TcpServerPort, h2TcpServerPort,
h2StartWebServer, h2StartWebServer,
h2tWebServerPort) h2tWebServerPort
)
initApiServer(apiServerPort, database) initApiServer(apiServerPort, database)
@ -48,7 +49,7 @@ fun main() =
delay(60 * 1000L) delay(60 * 1000L)
println("END") println("END")
} }
fun initH2Server( fun initH2Server(
databaseBaseDir: String, databaseBaseDir: String,
@ -59,11 +60,20 @@ fun initH2Server(
): Database { ): Database {
val webServer = val webServer =
Server.createWebServer( Server.createWebServer(
"-trace", "-baseDir", databaseBaseDir, "-webPort", h2WebServerPort.toString()) "-baseDir",
databaseBaseDir,
"-webPort",
h2WebServerPort.toString()
)
val tcpServer = val tcpServer =
Server.createTcpServer( Server.createTcpServer(
"-trace", "-baseDir", databaseBaseDir, "-webPort", h2TcpServerPort.toString()) "-trace",
"-baseDir",
databaseBaseDir,
"-webPort",
h2TcpServerPort.toString()
)
if (h2StartWebServer) { if (h2StartWebServer) {
webServer.start() webServer.start()
@ -79,6 +89,7 @@ fun initH2Server(
addLogger(StdOutSqlLogger) addLogger(StdOutSqlLogger)
SchemaUtils.create(Statistics) SchemaUtils.create(Statistics)
SchemaUtils.create(LiveStatistics) SchemaUtils.create(LiveStatistics)
SchemaUtils.create(AggregateStatistics)
SchemaUtils.create(Players) SchemaUtils.create(Players)
// Create indexes with explicit SQL because I can't figure out how to do it with exposed // Create indexes with explicit SQL because I can't figure out how to do it with exposed
@ -107,12 +118,11 @@ fun initH2Server(
return database return database
} }
suspend fun initPeriodicFetching(interval: Long, folder: String, db: Database) = suspend fun initPeriodicFetching(interval: Long, folder: String, db: Database) = coroutineScope {
coroutineScope {
launch { launch {
while (true) { while (true) {
StatisticsImporter.importStatistics(folder, db) StatisticsImporter.importStatistics(folder, db)
delay(interval) delay(interval)
} }
} }
} }

View File

@ -4,16 +4,6 @@ import java.time.Instant
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.`java-time`.timestamp import org.jetbrains.exposed.sql.`java-time`.timestamp
object LiveStatistics : Table() {
val playerId: Column<String> = varchar("PlayerId", 150)
val type: Column<String> = varchar("Type", 150)
val name: Column<String> = varchar("Name", 150)
val value: Column<Long> = long("Value")
val rank: Column<Int> = integer("Rank")
override val primaryKey = PrimaryKey(playerId, type, name, name = "PK_playerId_type_name")
}
object Statistics : Table() { object Statistics : Table() {
val playerId: Column<String> = varchar("PlayerId", 150) val playerId: Column<String> = varchar("PlayerId", 150)
val type: Column<String> = varchar("Type", 150) val type: Column<String> = varchar("Type", 150)
@ -25,6 +15,25 @@ object Statistics : Table() {
PrimaryKey(playerId, type, name, timestamp, name = "PK_playerId_type_name_timestamp") PrimaryKey(playerId, type, name, timestamp, name = "PK_playerId_type_name_timestamp")
} }
object LiveStatistics : Table() {
val playerId: Column<String> = varchar("PlayerId", 150)
val type: Column<String> = varchar("Type", 150)
val name: Column<String> = varchar("Name", 150)
val value: Column<Long> = long("Value")
val rank: Column<Int> = integer("Rank")
override val primaryKey = PrimaryKey(playerId, type, name, name = "PK_playerId_type_name")
}
object AggregateStatistics : Table() {
val type: Column<String> = varchar("Type", 150)
val name: Column<String> = varchar("Name", 150)
val timestamp: Column<Instant> = timestamp("Timestamp")
val value: Column<Long> = long("Value")
override val primaryKey = PrimaryKey(type, name, timestamp, name = "PK_type_name_timestamp")
}
object Players : Table() { object Players : Table() {
val id: Column<String> = varchar("Id", 150) val id: Column<String> = varchar("Id", 150)
val name: Column<String> = varchar("Name", 150) val name: Column<String> = varchar("Name", 150)

View File

@ -30,6 +30,8 @@ object StatisticsImporter {
File(folder).listFiles().forEach { readFile(it, database) } File(folder).listFiles().forEach { readFile(it, database) }
println("Updating live statistics table...") println("Updating live statistics table...")
updateLiveStatistics(database) updateLiveStatistics(database)
println("Updating aggregate statistics table...")
updateAggregateStatistics(database)
println("Refreshing player names...") println("Refreshing player names...")
refreshPlayerNames(database) refreshPlayerNames(database)
} }
@ -123,6 +125,42 @@ object StatisticsImporter {
} }
} }
fun updateAggregateStatistics(database: Database) {
transaction(database) {
TransactionManager.current()
.exec(
"""
SET @Timestamp = CURRENT_TIMESTAMP;
INSERT INTO AGGREGATESTATISTICS ("Type", "Name", "Timestamp", "Value")
SELECT Live."Type",
'',
@Timestamp AS "Timestamp",
sum(Live."Value")
FROM LIVESTATISTICS as Live
LEFT JOIN AGGREGATESTATISTICS as Agg
ON Live."Type" = Agg."Type"
WHERE array_contains(array['minecraft:mined'], Live."Type")
GROUP BY Live."Type"
HAVING sum(Live."Value") <> max(Agg."Value");
INSERT INTO AGGREGATESTATISTICS ("Type", "Name", "Timestamp", "Value")
SELECT Live."Type",
Live."Name",
@Timestamp AS "Timestamp",
Sum(Live."Value")
FROM livestatistics as Live
LEFT JOIN AGGREGATESTATISTICS as Agg
ON Live."Type" = Agg."Type" AND Live."Name" = Agg."Name"
WHERE array_contains(array['minecraft:animals_bred', 'minecraft:play_one_minute', 'minecraft:deaths', 'minecraft:player_kills', 'minecraft:aviate_one_cm', 'minecraft:boat_one_cm', 'minecraft:crouch_one_cm', 'minecraft:horse_one_cm', 'minecraft:minecart_one_cm', 'minecraft:sprint_one_cm', 'minecraft:strider_one_cm', 'minecraft:swim_one_cm', 'minecraft:walk_on_water_one_cm', 'minecraft:walk_one_cm', 'minecraft:walk_under_water_one_cm' ], Live."Name")
OR array_contains(array['minecraft:killed', 'minecraft:killed_by'], Live."Type")
GROUP BY Live."Type", Live."Name"
HAVING sum(Live."Value") <> max(Agg."Value");
""".trimIndent()
)
}
}
fun refreshPlayerNames(database: Database) { fun refreshPlayerNames(database: Database) {
transaction(database) { transaction(database) {
addLogger(StdOutSqlLogger) addLogger(StdOutSqlLogger)