Skip to content

Commit

Permalink
chore(indexer, infra): use deadpool to manage rds proxy connections (#…
Browse files Browse the repository at this point in the history
…461)

* chore(indexer, infra): use deadpool to manage rds proxy connections

* fix(infra): fix build types for indexer infra

* fix(infra): fix indexer params
  • Loading branch information
tim-schultz authored Nov 8, 2023
1 parent 08567ba commit 2d44885
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 48 deletions.
34 changes: 34 additions & 0 deletions indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deadpool-postgres = "0.11.0"
dotenv = "0.15.0"
ethers = { version = "2.0.10", features = ["abigen", "ws"] }
eyre = "0.6.8"
Expand Down
18 changes: 8 additions & 10 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,18 @@ async fn format_and_save_x_stake_event(
Ok(())
}

pub fn get_env(var: &str) -> String {
env::var(var).unwrap_or_else(|_| panic!("Required environment variable \"{}\" not set", var))
}

#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();

let get_env = |var| {
env::var(var).map_err(|_| panic!("Required environment variable \"{}\" not set", var))
};

let rpc_url = get_env("RPC_URL").unwrap();

let database_url = get_env("DATABASE_URL").unwrap();
let rpc_url = get_env("RPC_URL");

let f1 = listen_for_blocks(&rpc_url);
let f2 = listen_for_stake_events(&rpc_url, &database_url);
let f2 = listen_for_stake_events(&rpc_url);

try_join!(f1, f2)?;

Expand All @@ -129,7 +127,7 @@ async fn listen_for_blocks(rpc_url: &str) -> Result<()> {
return Ok(());
}

async fn listen_for_stake_events(rpc_url: &str, database_url: &str) -> Result<()> {
async fn listen_for_stake_events(rpc_url: &str) -> Result<()> {
let provider = Provider::<Ws>::connect(rpc_url).await?;

let id_staking_address = "0x0E3efD5BE54CC0f4C64e0D186b0af4b7F2A0e95F".parse::<Address>()?;
Expand All @@ -139,7 +137,7 @@ async fn listen_for_stake_events(rpc_url: &str, database_url: &str) -> Result<()

let current_block = client.get_block_number().await?;

let postgres_client = PostgresClient::new(&database_url).await?;
let postgres_client = PostgresClient::new().await?;

// This is the block number from which we want to start querying events. Either the contract initiation or the last block we queried.
let query_start_block = postgres_client.get_latest_block().await?;
Expand Down
44 changes: 28 additions & 16 deletions indexer/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
use std::str::FromStr;

use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
use rust_decimal::prelude::*;
use tokio;
use tokio_postgres::{Client, Error, NoTls};
use tokio_postgres::{Error, NoTls};

use crate::CONTRACT_START_BLOCK;
use crate::{get_env, CONTRACT_START_BLOCK};

pub struct PostgresClient {
client: Client,
pool: Pool,
}

impl PostgresClient {
pub async fn new(database_url: &str) -> Result<Self, Error> {
let (client, connection) = tokio_postgres::connect(database_url, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Error - Failed to establish postgres connection: {}", e);
}
});
Ok(Self { client })
pub async fn new() -> Result<Self, Error> {
let mut pg_config = tokio_postgres::Config::new();

pg_config
.user(&get_env("DB_USER"))
.password(get_env("DB_PASSWORD"))
.dbname(&get_env("DB_NAME"))
.host(&get_env("DB_HOST"))
.port(get_env("DB_PORT").parse::<u16>().unwrap());

let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(pg_config, NoTls, mgr_config);

let pool = Pool::builder(mgr).max_size(16).build().unwrap();

Ok(Self { pool })
}

pub async fn insert_into_combined_stake_filter_self_stake(
Expand All @@ -32,7 +42,8 @@ impl PostgresClient {
) -> Result<(), Error> {
let mut decimal_amount = Decimal::from_str(amount).unwrap();
let _ = decimal_amount.set_scale(18).unwrap();
self.client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",&[&"SelfStake", &round_id, &staker, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",&[&"SelfStake", &round_id, &staker, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!("Row inserted into registry_gtcstakeevent with type SelfStake!");
Ok(())
}
Expand All @@ -49,13 +60,14 @@ impl PostgresClient {
) -> Result<(), Error> {
let mut decimal_amount = Decimal::from_str(amount).unwrap();
let _ = decimal_amount.set_scale(18).unwrap();
self.client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, address, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&"Xstake", &round_id, &staker, &user, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, address, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&"Xstake", &round_id, &staker, &user, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!("Row inserted into registry_gtcstakeevent with type Xstake!");
Ok(())
}
pub async fn get_latest_block(&self) -> Result<i32, Error> {
let latest_block_rows = self
.client
let client = self.pool.get().await.unwrap();
let latest_block_rows = client
.query(
"SELECT block_number FROM registry_gtcstakeevent ORDER BY id DESC LIMIT 1;",
&[],
Expand Down
32 changes: 27 additions & 5 deletions infra/lib/scorer/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,13 @@ export async function createScoreExportBucketAndDomain(
export const dockerGtcStakingIndexerImage = `${process.env["DOCKER_GTC_PASSPORT_INDEXER_IMAGE"]}`;

type IndexerServiceParams = {
indexerRdsConnectionUrl: Input<string>;
rdsConnectionConfig: {
dbUsername: string;
dbPassword: Output<string>;
dbName: string;
dbHost: Output<string>;
dbPort: string;
};
cluster: Cluster;
vpc: awsx.ec2.Vpc;
privateSubnetSecurityGroup: aws.ec2.SecurityGroup;
Expand All @@ -560,7 +566,7 @@ type IndexerServiceParams = {
};

export function createIndexerService({
indexerRdsConnectionUrl,
rdsConnectionConfig,
cluster,
vpc,
privateSubnetSecurityGroup,
Expand All @@ -578,10 +584,26 @@ export function createIndexerService({
},
];

const indexerEnvironment = [
const indexerEnvironment: { name: string; value: Input<string> }[] = [
{
name: "DATABASE_URL",
value: indexerRdsConnectionUrl,
name: "DB_USER",
value: rdsConnectionConfig.dbUsername,
},
{
name: "DB_PASSWORD",
value: rdsConnectionConfig.dbPassword,
},
{
name: "DB_HOST",
value: rdsConnectionConfig.dbHost,
},
{
name: "DB_PORT",
value: rdsConnectionConfig.dbPort,
},
{
name: "DB_NAME",
value: rdsConnectionConfig.dbName,
},
];

Expand Down
14 changes: 10 additions & 4 deletions infra/prod/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,7 @@ export const rdsArn = postgresql.arn;
export const rdsConnectionUrl = pulumi.secret(
pulumi.interpolate`psql://${dbUsername}:${dbPassword}@${scorerDbProxyEndpoint}/${dbName}`
);
export const indexerRdsConnectionUrl = pulumi.secret(
pulumi.interpolate`postgres://${dbUsername}:${dbPassword}@${scorerDbProxyEndpoint}/${dbName}`
);

export const readreplica0ConnectionUrl = pulumi.secret(
pulumi.interpolate`psql://${dbUsername}:${dbPassword}@${readreplica0.endpoint}/${dbName}`
);
Expand Down Expand Up @@ -1472,8 +1470,16 @@ const exportVals = createScoreExportBucketAndDomain(
route53ZoneForPublicData
);

const rdsConnectionConfig = {
dbUsername,
dbPassword,
dbName,
dbHost: scorerDbProxyEndpoint,
dbPort: String(5432),
};

createIndexerService({
indexerRdsConnectionUrl,
rdsConnectionConfig,
cluster,
vpc,
privateSubnetSecurityGroup,
Expand Down
23 changes: 10 additions & 13 deletions infra/staging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ export const rdsArn = postgresql.arn;
export const rdsConnectionUrl = pulumi.secret(
pulumi.interpolate`psql://${dbUsername}:${dbPassword}@${scorerDbProxyEndpoint}/${dbName}`
);
export const indexerRdsConnectionUrl = pulumi.secret(
pulumi.interpolate`postgres://${dbUsername}:${dbPassword}@${scorerDbProxyEndpoint}/${dbName}`
);

export const rdsId = postgresql.id;

//////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1427,22 +1425,21 @@ const exportVals = createScoreExportBucketAndDomain(
route53ZoneForPublicData
);

// TODO: remove once prod is verified to be working
// createIndexerService(
// indexerRdsConnectionUrl,
// cluster,
// vpc,
// privateSubnetSecurityGroup,
// workerRole
// );

const pagerdutyTopic = new aws.sns.Topic("pagerduty", {
name: "Pagerduty",
tracingConfig: "PassThrough",
});

const rdsConnectionConfig = {
dbUsername,
dbPassword,
dbName,
dbHost: scorerDbProxyEndpoint,
dbPort: String(5432),
};

createIndexerService({
indexerRdsConnectionUrl,
rdsConnectionConfig,
cluster,
vpc,
privateSubnetSecurityGroup,
Expand Down

0 comments on commit 2d44885

Please sign in to comment.