Skip to content

Commit

Permalink
osmosis-oracle: better subscription process
Browse files Browse the repository at this point in the history
  • Loading branch information
uint committed Oct 31, 2023
1 parent ea6fee0 commit a7b7a2d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 60 deletions.
48 changes: 11 additions & 37 deletions contracts/osmosis-price-provider/src/contract.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::collections::HashMap;
use std::str::FromStr;

use cosmwasm_schema::cw_serde;
use cosmwasm_std::{
ensure_eq, entry_point, Decimal, DepsMut, Env, IbcChannel, Response, Timestamp,
};
Expand All @@ -15,7 +13,7 @@ use sylvia::{contract, schemars};

use crate::error::ContractError;
use crate::ibc::make_ibc_packet;
use crate::state::Config;
use crate::state::{Config, Subscription, Subscriptions};

pub const CONTRACT_NAME: &str = env!("CARGO_PKG_NAME");
pub const CONTRACT_VERSION: &str = env!("CARGO_PKG_VERSION");
Expand All @@ -24,16 +22,10 @@ const BASE_ASSET: &str = "OSMO";

const EPOCH_IN_SECS: u64 = 120;
const LAST_EPOCH: Item<'static, Timestamp> = Item::new("last_epoch");
const SUBSCRIPTIONS: Item<'static, HashMap<String, Subscription>> = Item::new("subscriptions");

#[cw_serde]
pub struct Subscription {
channel: IbcChannel,
pool_id: u64,
}

pub struct OsmosisPriceProvider {
config: Item<'static, Config>,
pub(crate) subscriptions: Subscriptions,
}

#[cfg_attr(not(feature = "library"), sylvia::entry_points)]
Expand All @@ -43,6 +35,7 @@ impl OsmosisPriceProvider {
pub const fn new() -> Self {
Self {
config: Item::new("config"),
subscriptions: Subscriptions::new(),
}
}

Expand All @@ -57,6 +50,7 @@ impl OsmosisPriceProvider {
let admin = ctx.deps.api.addr_validate(&admin)?;
let config = Config { admin };
self.config.save(ctx.deps.storage, &config)?;
self.subscriptions.init(ctx.deps.storage)?;
LAST_EPOCH.save(ctx.deps.storage, &Timestamp::from_seconds(0))?;

set_contract_version(ctx.deps.storage, CONTRACT_NAME, CONTRACT_VERSION)?;
Expand All @@ -65,7 +59,7 @@ impl OsmosisPriceProvider {
}

#[msg(exec)]
pub fn subscribe(
pub fn bind(
&self,
ctx: ExecCtx,
denom: String,
Expand All @@ -75,31 +69,10 @@ impl OsmosisPriceProvider {
let cfg = self.config.load(ctx.deps.storage)?;
ensure_eq!(ctx.info.sender, cfg.admin, ContractError::Unauthorized {});

let mut subs = SUBSCRIPTIONS.load(ctx.deps.storage)?;
self.subscriptions
.bind_channel(ctx.deps.storage, channel, denom, pool_id)?;

if subs.contains_key(&denom) {
Err(ContractError::SubscriptionAlreadyExists)
} else {
subs.insert(denom, Subscription { channel, pool_id });
SUBSCRIPTIONS.save(ctx.deps.storage, &subs)?;
Ok(Response::new())
}
}

#[msg(exec)]
pub fn unsubscribe(&self, ctx: ExecCtx, denom: String) -> Result<Response, ContractError> {
let cfg = self.config.load(ctx.deps.storage)?;
ensure_eq!(ctx.info.sender, cfg.admin, ContractError::Unauthorized {});

let mut subs = SUBSCRIPTIONS.load(ctx.deps.storage)?;

if !subs.contains_key(&denom) {
Err(ContractError::SubscriptionDoesNotExist)
} else {
subs.remove(&denom);
SUBSCRIPTIONS.save(ctx.deps.storage, &subs)?;
Ok(Response::new())
}
Ok(Response::new())
}
}

Expand All @@ -114,11 +87,12 @@ pub fn sudo(
let last_epoch = LAST_EPOCH.load(deps.storage)?;
let secs_since_last_epoch = env.block.time.seconds() - last_epoch.seconds();
if secs_since_last_epoch >= EPOCH_IN_SECS {
let subs = SUBSCRIPTIONS.load(deps.storage)?;
let contract = OsmosisPriceProvider::new();

let subs = contract.subscriptions.subs(deps.storage)?;
let querier = TwapQuerier::new(&deps.querier);

let msgs = subs
.into_iter()
.map(|(denom, Subscription { channel, pool_id })| {
let twap = querier
.arithmetic_twap_to_now(pool_id, BASE_ASSET.to_string(), denom, None)?
Expand Down
5 changes: 4 additions & 1 deletion contracts/osmosis-price-provider/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub enum ContractError {
#[error("Contract already has an open IBC channel")]
IbcChannelAlreadyOpen,

#[error("The provided IBC channel is not open")]
IbcChannelNotOpen,

#[error("You must start the channel handshake on this side, it doesn't support OpenTry")]
IbcOpenTryDisallowed,

Expand All @@ -29,6 +32,6 @@ pub enum ContractError {
#[error("A subscription for the provided denom does not exist")]
SubscriptionDoesNotExist,

#[error("There is no subscription for the provided denom")]
#[error("A subscription already exists for the provided denom")]
SubscriptionAlreadyExists,
}
39 changes: 18 additions & 21 deletions contracts/osmosis-price-provider/src/ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,19 @@ use cosmwasm_std::{
IbcChannelOpenResponse, IbcMsg, IbcPacketAckMsg, IbcPacketReceiveMsg, IbcPacketTimeoutMsg,
IbcReceiveResponse, IbcTimeout, Timestamp,
};
use cw_storage_plus::Item;

use crate::error::ContractError;
use mesh_apis::ibc::{
validate_channel_order, AckWrapper, PriceFeedProviderPacket, ProtocolVersion,
};

use crate::{contract::OsmosisPriceProvider, error::ContractError};

const PROTOCOL_NAME: &str = "mesh-security-price-feed";
/// This is the maximum version of the price feed protocol that we support
const SUPPORTED_IBC_PROTOCOL_VERSION: &str = "0.1.0";
/// This is the minimum version that we are compatible with
const MIN_IBC_PROTOCOL_VERSION: &str = "0.1.0";

// IBC specific state
pub const IBC_CHANNEL: Item<IbcChannel> = Item::new("ibc_channel");

const TIMEOUT: u64 = 60 * 60;

pub fn packet_timeout(now: &Timestamp) -> IbcTimeout {
Expand All @@ -33,14 +30,10 @@ pub fn packet_timeout(now: &Timestamp) -> IbcTimeout {
#[cfg_attr(not(feature = "library"), entry_point)]
/// enforces ordering and versioning constraints
pub fn ibc_channel_open(
deps: DepsMut,
_deps: DepsMut,
_env: Env,
msg: IbcChannelOpenMsg,
) -> Result<IbcChannelOpenResponse, ContractError> {
// ensure we have no channel yet
if IBC_CHANNEL.may_load(deps.storage)?.is_some() {
return Err(ContractError::IbcChannelAlreadyOpen);
}
// ensure we are called with OpenInit
let channel = match msg {
IbcChannelOpenMsg::OpenInit { channel } => channel,
Expand Down Expand Up @@ -73,13 +66,9 @@ pub fn ibc_channel_open(
/// once it's established, we store data
pub fn ibc_channel_connect(
deps: DepsMut,
env: Env,
_env: Env,
msg: IbcChannelConnectMsg,
) -> Result<IbcBasicResponse, ContractError> {
// ensure we have no channel yet
if IBC_CHANNEL.may_load(deps.storage)?.is_some() {
return Err(ContractError::IbcChannelAlreadyOpen);
}
// ensure we are called with OpenAck
let (channel, counterparty_version) = match msg {
IbcChannelConnectMsg::OpenAck {
Expand All @@ -96,25 +85,33 @@ pub fn ibc_channel_connect(
let v: ProtocolVersion = from_slice(counterparty_version.as_bytes())?;
v.verify_compatibility(SUPPORTED_IBC_PROTOCOL_VERSION, MIN_IBC_PROTOCOL_VERSION)?;

todo!("store the channel in subscriptions");
let contract = OsmosisPriceProvider::new();
contract
.subscriptions
.register_channel(deps.storage, channel)?;

Ok(IbcBasicResponse::new())
}

#[cfg_attr(not(feature = "library"), entry_point)]
pub fn ibc_channel_close(
_deps: DepsMut,
deps: DepsMut,
_env: Env,
_msg: IbcChannelCloseMsg,
msg: IbcChannelCloseMsg,
) -> Result<IbcBasicResponse, ContractError> {
todo!("remove subscription");
let contract = OsmosisPriceProvider::new();
contract
.subscriptions
.remove_channel(deps.storage, msg.channel())?;

Ok(IbcBasicResponse::new())
}

#[cfg_attr(not(feature = "library"), entry_point)]
pub fn ibc_packet_receive(
deps: DepsMut,
_deps: DepsMut,
_env: Env,
msg: IbcPacketReceiveMsg,
_msg: IbcPacketReceiveMsg,
) -> Result<IbcReceiveResponse, ContractError> {
// this contract only sends out update packets over IBC - it's not meant to receive any
Err(ContractError::IbcPacketRecvDisallowed)
Expand Down
94 changes: 93 additions & 1 deletion contracts/osmosis-price-provider/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,99 @@
use std::collections::HashMap;

use cosmwasm_schema::cw_serde;
use cosmwasm_std::Addr;
use cosmwasm_std::{Addr, IbcChannel, StdError, Storage};
use cw_storage_plus::Item;

use crate::error::ContractError;

#[cw_serde]
pub struct Config {
pub admin: Addr,
}

pub struct Subscriptions {
by_denom: Item<'static, HashMap<String, Subscription>>,
inactive: Item<'static, Vec<IbcChannel>>,
}

impl Subscriptions {
pub(crate) const fn new() -> Self {
Self {
by_denom: Item::new("subscriptions_by_denom"),
inactive: Item::new("subscriptions_inactive"),
}
}

pub(crate) fn init(&self, storage: &mut dyn Storage) -> Result<(), StdError> {
self.by_denom.save(storage, &HashMap::new())?;
self.inactive.save(storage, &vec![])?;

Ok(())
}

pub(crate) fn register_channel(
&self,
storage: &mut dyn Storage,
channel: IbcChannel,
) -> Result<(), ContractError> {
self.inactive.update(storage, |mut v| {
if v.iter().find(|c| **c == channel).is_some() {

Check failure on line 40 in contracts/osmosis-price-provider/src/state.rs

View workflow job for this annotation

GitHub Actions / Lints

called `is_some()` after searching an `Iterator` with `find`
Err(ContractError::IbcChannelAlreadyOpen)
} else {
v.push(channel);
Ok(v)
}
})?;

Ok(())
}

pub(crate) fn bind_channel(
&self,
storage: &mut dyn Storage,
channel: IbcChannel,
denom: String,
pool_id: u64,
) -> Result<(), ContractError> {
self.inactive.update(storage, |mut v| {
if let Some((ix, _)) = v.iter().enumerate().find(|(_, c)| **c == channel) {
v.remove(ix);
Ok(v)
} else {
Err(ContractError::IbcChannelNotOpen)
}
})?;

self.by_denom.update(storage, |mut map| {
map.insert(denom, Subscription { channel, pool_id })
.is_none()
.then_some(map)
.ok_or(ContractError::SubscriptionAlreadyExists)
})?;

Ok(())
}

pub(crate) fn subs(
&self,
storage: &mut dyn Storage,
) -> Result<impl Iterator<Item = (String, Subscription)>, ContractError> {
let list = self.by_denom.load(storage)?;

Ok(list.into_iter())
}

pub(crate) fn remove_channel(
&self,
_storage: &mut dyn Storage,
_channel: &IbcChannel,
) -> Result<(), ContractError> {
todo!()
}
}

#[cw_serde]
pub struct Subscription {
pub channel: IbcChannel,
pub pool_id: u64,
}

0 comments on commit a7b7a2d

Please sign in to comment.