Skip to content

Commit

Permalink
Improve error handling with anyhow (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlsh committed Dec 4, 2022
1 parent 8a6ccfe commit 9351be9
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 103 deletions.
25 changes: 14 additions & 11 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
database::Database,
Config,
};
use anyhow::{anyhow, Context, Result};
use log::info;
use reqwest::Client;

Expand All @@ -15,49 +16,51 @@ pub struct App {
}

impl App {
pub fn new(config: Config) -> Result<Self, String> {
pub fn new(config: Config) -> Self {
let database = Database::open(config.rocksdb_path.as_path());
let client = Client::new();
Ok(Self {
Self {
database,
client,
config,
})
}
}

pub async fn poll(&mut self) -> Result<(), String> {
pub async fn poll(&mut self) -> Result<()> {
info!("Starting to poll Twitter timeline from config.");
Poll::new(self.config.twitter_token.take(), self.poll_config()?)?
.run(&self.client, &self.database)
.await
.with_context(|| "Failed to execute poll command")
}

pub async fn push(&mut self) -> Result<(), String> {
pub async fn push(&mut self) -> Result<()> {
info!("Starting to push timeline to Telegram channel(s) from config.");
Push::new(self.config.telegram_token.take(), self.push_config()?)?
.run(&self.client, &mut self.database)
.await
.with_context(|| "Failed to execute push command")
}

pub fn info(&self) -> Result<(), String> {
pub fn info(&self) -> Result<()> {
info!("Overview info of database.");
info(&self.database).map_err(|err| format!("Error displaying database data: {:?}", err))
info(&self.database).with_context(|| "Failed to execute info command")
}

/// Returns poll configs that are included.
fn poll_config(&mut self) -> Result<Vec<PollConfig>, String> {
fn poll_config(&mut self) -> Result<Vec<PollConfig>> {
self.config
.poll
.take()
.map(|cfg| cfg.into_iter().filter(|cfg| cfg.included).collect())
.ok_or_else(|| "Empty poll config".into())
.ok_or_else(|| anyhow!("Empty poll config"))
}

fn push_config(&mut self) -> Result<Vec<PushConfig>, String> {
fn push_config(&mut self) -> Result<Vec<PushConfig>> {
self.config
.push
.take()
.map(|cfg| cfg.into_iter().filter(|cfg| cfg.included).collect())
.ok_or_else(|| "Empty poll config".into())
.ok_or_else(|| anyhow!("Empty push config"))
}
}
38 changes: 17 additions & 21 deletions src/commands/poll.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Duration};
use log::{info, trace};
use reqwest::Client;
Expand All @@ -17,18 +18,15 @@ pub(crate) struct Poll {
}

impl Poll {
pub(crate) fn new(
twitter_token: Option<String>,
poll_config: Vec<PollConfig>,
) -> Result<Self, String> {
let twitter_token = twitter_token.ok_or("Empty twitter token")?;
pub(crate) fn new(twitter_token: Option<String>, poll_config: Vec<PollConfig>) -> Result<Self> {
let twitter_token = twitter_token.ok_or_else(|| anyhow!("Empty twitter token"))?;
Ok(Self {
twitter_token,
config: poll_config,
})
}

pub(crate) async fn run(&mut self, client: &Client, database: &Database) -> Result<(), String> {
pub(crate) async fn run(&mut self, client: &Client, database: &Database) -> Result<()> {
let user_map = self.user_map(client).await?;

// Loop Twitter users in poll configs.
Expand All @@ -38,7 +36,7 @@ impl Poll {
cfg.insert_start_time(start_time);
info!("Polling timeline with config: {:?}", cfg);

let endpoint = Self::endpoint(cfg, &user_map);
let endpoint = Self::endpoint(cfg, &user_map)?;
// Note: `since_id` takes higher priority than `start_time` in request query parameters.
let since_id = cfg.since_id.take().map(PaginationToken::TweetID);
let mut timeline = Timeline::new(client, endpoint, &self.twitter_token, since_id);
Expand All @@ -62,54 +60,52 @@ impl Poll {
// Gets `create_at` of a latest tweet in persistent state, then adds one second to it
// to be used as `start_time` in timeline request query. This is necessary to deduplicate
// a tweet when polling.
fn fetch_state(database: &Database, username: &str) -> Result<Option<String>, String> {
fn fetch_state(database: &Database, username: &str) -> Result<Option<String>> {
if let Some(value) = database.get_cf("state", username)? {
let value_str = str::from_utf8(&value)
.map_err(|err| format!("could not convert string from bytes: {:?}", err))?;
Ok(DateTime::parse_from_rfc3339(value_str)
.map_err(|err| format!("could not parse date time from string: {:?}", err))?
let value_str = str::from_utf8(&value)?;
Ok(DateTime::parse_from_rfc3339(value_str)?
.checked_add_signed(Duration::seconds(1))
.map(|datetime| datetime.to_rfc3339()))
} else {
Ok(None)
}
}

fn upsert_state(database: &Database, username: &str, created_at: &str) -> Result<(), String> {
fn upsert_state(database: &Database, username: &str, created_at: &str) -> Result<()> {
trace!("Upsert state: key: {}, value: {}", username, created_at);
database.put_cf("state", username, created_at)
}

fn insert_tweet(database: &Database, username: &str, tweet: &Tweet) -> Result<(), String> {
fn insert_tweet(database: &Database, username: &str, tweet: &Tweet) -> Result<()> {
let key = format!("{}:{}", username, tweet.id);
let value = serde_json::to_vec(&tweet)
.map_err(|err| format!("could not serialize tweet data to json: {:?}", err))?;
let value =
serde_json::to_vec(&tweet).with_context(|| "could not serialize tweet data to json")?;
trace!("Insert tweet: key: {}, value: {:?}", key, tweet);
database.put_cf("timeline", key, value)
}

fn endpoint(config: &PollConfig, user_map: &HashMap<String, String>) -> Url {
fn endpoint(config: &PollConfig, user_map: &HashMap<String, String>) -> Result<Url> {
// Unwrap it directly since we are sure it's not None.
let user_id = user_map.get(config.username.as_str()).unwrap();
UrlBuilder::new(user_id)
Ok(UrlBuilder::new(user_id)?
.tweet_fields(vec!["created_at"])
// Set default `max_results` value: 100.
.max_results(config.max_results.unwrap_or(100))
.start_time(config.start_time.as_deref())
.end_time(config.end_time.as_deref())
.build()
.build())
}

/// Returns a username to user_id map.
async fn user_map(&self, client: &Client) -> Result<HashMap<String, String>, String> {
async fn user_map(&self, client: &Client) -> Result<HashMap<String, String>> {
let usernames = self
.config
.iter()
.map(|cfg| cfg.username.as_str())
.collect();
Users::fetch(client, usernames, &self.twitter_token)
.await?
.ok_or_else(|| "No Twitter users found".into())
.ok_or_else(|| anyhow!("No Twitter users found"))
}
}

Expand Down
25 changes: 10 additions & 15 deletions src/commands/push.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{anyhow, Context, Result};
use log::{debug, info, warn};
use reqwest::{Client, StatusCode};
use std::{collections::HashMap, str, time::Duration};
Expand All @@ -21,11 +22,8 @@ pub(crate) struct Push {
}

impl Push {
pub(crate) fn new(
telegram_token: Option<String>,
config: Vec<PushConfig>,
) -> Result<Self, String> {
let telegram_token = telegram_token.ok_or("Empty Telegram token")?;
pub(crate) fn new(telegram_token: Option<String>, config: Vec<PushConfig>) -> Result<Self> {
let telegram_token = telegram_token.ok_or_else(|| anyhow!("Empty Telegram token"))?;
Ok(Self {
telegram_token,
config,
Expand All @@ -34,11 +32,7 @@ impl Push {
})
}

pub(crate) async fn run(
&mut self,
client: &Client,
database: &mut Database,
) -> Result<(), String> {
pub(crate) async fn run(&mut self, client: &Client, database: &mut Database) -> Result<()> {
let user_map = self.user_map();
// Read timeline column family from database.
// Note: we're sure there's a timeline iterator, so just unwrap it directly.
Expand All @@ -50,10 +44,8 @@ impl Push {
}

let (twitter_username, tweet) = {
let key_str = str::from_utf8(&key)
.map_err(|err| format!("could not convert string from bytes: {:?}", err))?;
let tweet: Tweet = serde_json::from_slice(&value)
.map_err(|err| format!("could not decode data from bytes: {:?}", err))?;
let key_str = str::from_utf8(&key)?;
let tweet: Tweet = serde_json::from_slice(&value)?;
// Unwrap it directly since we're sure it's Some(&str).
let (twitter_username, _) = key_str.split_once(':').unwrap();
(twitter_username, tweet)
Expand All @@ -65,7 +57,10 @@ impl Push {
chat_id: format!("@{}", telegram_channel),
text: tweet.text,
};
let response = message.send(client, &self.telegram_token).await?;
let response = message
.send(client, &self.telegram_token)
.await
.with_context(|| "Failed to send message to Telegram channel")?;
match response.status() {
// Note: Telegram bot api applies requests rate limit.
StatusCode::OK => time::sleep(Duration::from_secs(3)).await,
Expand Down
44 changes: 15 additions & 29 deletions src/database.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{anyhow, Context, Result};
use rocksdb::{ColumnFamilyDescriptor, DBIterator, IteratorMode, Options, DB};
use std::path::Path;

Expand All @@ -19,34 +20,24 @@ impl Database {
Self(db)
}

pub(crate) fn put_cf<K, V>(&self, cf: &str, key: K, value: V) -> Result<(), String>
pub(crate) fn put_cf<K, V>(&self, cf: &str, key: K, value: V) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
match self.0.cf_handle(cf) {
Some(cf_handle) => self
.0
.put_cf(cf_handle, key, value)
.map_err(|error| format!("could not put data into rocksdb: {:?}", error)),
None => Err(format!("no such column family: {}", cf)),
Some(cf_handle) => Ok(self.0.put_cf(cf_handle, key, value)?),
None => Err(anyhow!("no such column family: {}", cf)),
}
}

pub(crate) fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &str,
key: K,
) -> Result<Option<Vec<u8>>, String> {
pub(crate) fn get_cf<K: AsRef<[u8]>>(&self, cf: &str, key: K) -> Result<Option<Vec<u8>>> {
match self.0.cf_handle(cf) {
Some(cf_handle) => match self.0.get_cf(cf_handle, key) {
Ok(value) => Ok(value),
Err(error) => Err(format!(
"could not get value from column family: {:?}",
error
)),
},
None => Err(format!("no such column family: {}", cf)),
Some(cf_handle) => self
.0
.get_cf(cf_handle, key)
.with_context(|| "could not get value from column family"),
None => Err(anyhow!("no such column family: {}", cf)),
}
}

Expand All @@ -57,23 +48,18 @@ impl Database {
}

/// Performs an `from` inclusive but `to` exclusive range (`["from", "to")`) deletion.
pub(crate) fn delete_range_cf<K>(&self, cf: &str, from: K, to: K) -> Result<(), String>
pub(crate) fn delete_range_cf<K>(&self, cf: &str, from: K, to: K) -> Result<()>
where
K: AsRef<[u8]>,
{
match self.0.cf_handle(cf) {
Some(cf_handle) => self
.0
.delete_range_cf(cf_handle, from, to)
.map_err(|err| format!("could not delete entries range: {:?}", err)),
None => Err(format!("no such column family: {}", cf)),
Some(cf_handle) => Ok(self.0.delete_range_cf(cf_handle, from, to)?),
None => Err(anyhow!("no such column family: {}", cf)),
}
}

pub(crate) fn drop_cf(&mut self, cf: &str) -> Result<(), String> {
self.0
.drop_cf(cf)
.map_err(|err| format!("could not drop column family {}: {:?}", cf, err))
pub(crate) fn drop_cf(&mut self, cf: &str) -> Result<()> {
Ok(self.0.drop_cf(cf)?)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ async fn main() -> anyhow::Result<()> {

let cli = Cli::parse();
let config = load_config(cli.config_path).await?;
let mut app = App::new(config).unwrap();
let mut app = App::new(config);
match cli.command {
Command::Poll => app.poll().await.unwrap(),
Command::Push => app.push().await.unwrap(),
Command::Info => app.info().unwrap(),
Command::Poll => app.poll().await?,
Command::Push => app.push().await?,
Command::Info => app.info()?,
}
Ok(())
}
Expand Down
17 changes: 8 additions & 9 deletions src/telegram.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{Context, Result};
use reqwest::{Client, Response};
use serde::Serialize;
use url::Url;
Expand All @@ -12,24 +13,22 @@ pub(crate) struct Message {
}

impl Message {
pub(crate) async fn send(&self, client: &Client, telegram_token: &str) -> Result<Response, String> {
client
pub(crate) async fn send(&self, client: &Client, telegram_token: &str) -> Result<Response> {
Ok(client
.post(endpoint(telegram_token)?)
.json(self)
.send()
.await
.map_err(|err| format!("Error sending post request: {:?}", err))
.await?)
}
}

/// An endpoint for sending messages by Telegram bot.
/// See: https://core.telegram.org/bots/api#sendmessage
fn endpoint(token: &str) -> Result<Url, String> {
fn endpoint(token: &str) -> Result<Url> {
let api = Url::parse("https://api.telegram.org/")
.map_err(|error| format!("could not parse telegram api base endpoint: {}", error))?;
let url = Url::options()
.with_context(|| "Could not parse Telegram api base endpoint")?;
Url::options()
.base_url(Some(&api))
.parse(format!("/bot{token}/sendMessage").as_str())
.map_err(|error| format!("could not parse path :{}", error))?;
Ok(url)
.with_context(|| "could not parse Telegram api path")
}
Loading

0 comments on commit 9351be9

Please sign in to comment.