From 0a7c569c48a47ad521b8e27edbcee389d70ceed5 Mon Sep 17 00:00:00 2001 From: toast Date: Sat, 7 Dec 2024 15:50:52 +1100 Subject: [PATCH] Rework the RSS processing --- .gitea/workflows/build.yml | 12 ++- tasks/src/rss.rs | 49 ++++++++- tasks/src/rss/esxi.rs | 145 +++++++++++++++----------- tasks/src/rss/github.rs | 173 ++++++++++++++++++------------- tasks/src/rss/gportal.rs | 179 ++++++++++++++++++-------------- tasks/src/rss/processor.rs | 203 ++++++++++++++++++++++--------------- tasks/src/rss/rust.rs | 108 +++++++++++++------- 7 files changed, 533 insertions(+), 336 deletions(-) diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml index fed607a..511f48e 100644 --- a/.gitea/workflows/build.yml +++ b/.gitea/workflows/build.yml @@ -9,11 +9,13 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +env: + CARGO_TERM_COLOR: always + RUNNER_TOOL_CACHE: /toolcache + jobs: build: runs-on: ubuntu-22.04 - env: - RUNNER_TOOL_CACHE: /toolcache steps: - name: Set up Docker Buildx @@ -42,7 +44,8 @@ jobs: $HOME/.cargo/registry/index/ $HOME/.cargo/registry/cache/ target/ - key: ${{ runner.os }}-cache-${{ steps.cargo-cache-key.outputs.hash }} + key: ${{ runner.os }}-cargo-${{ steps.cargo-cache-key.outputs.hash }} + restore-keys: ${{ runner.os }}-cargo- - name: Login to Gitea uses: docker/login-action@v3 @@ -65,7 +68,6 @@ jobs: run: | rustup target add x86_64-unknown-linux-musl export GIT_COMMIT_HASH=${{ github.sha }} && \ - export GIT_COMMIT_BRANCH=${{ github.ref_name }} && \ cargo zigbuild --target x86_64-unknown-linux-musl --locked -rF production - name: Build and push image @@ -84,7 +86,7 @@ jobs: needs: build steps: - name: Deploy update - uses: appleboy/ssh-action@v1.2.0 + uses: appleboy/ssh-action@v1 with: host: ${{ secrets.SSH_HOST }} username: ${{ secrets.SSH_USERNAME }} diff --git a/tasks/src/rss.rs b/tasks/src/rss.rs index 11a86c9..c9fe50d 100644 --- a/tasks/src/rss.rs +++ b/tasks/src/rss.rs @@ -5,6 +5,13 @@ mod github; mod gportal; mod rust; +use { + esxi::Esxi, + github::GitHub, + gportal::GPortal, + rust::RustBlog +}; + use super::{ task_err, task_info @@ -21,7 +28,8 @@ use { poise::serenity_prelude::{ Context, CreateEmbed, - Timestamp + Timestamp, + async_trait }, regex::Regex, reqwest::Response, @@ -32,6 +40,8 @@ use { } }; +pub type RSSFeedBox = Box; + const TASK_NAME: &str = "RSS"; static REDIS_EXPIRY_SECS: i64 = 7200; static REDIS_SERVICE: OnceCell> = OnceCell::new(); @@ -94,7 +104,7 @@ async fn save_to_redis( let redis = get_redis().await; redis.set(key, value).await.unwrap(); if let Err(y) = redis.expire(key, REDIS_EXPIRY_SECS).await { - task_err("RSS", format!("[RedisExpiry]: {}", y).as_str()); + task_err("RSS", format!("[RedisExpiry]: {y}").as_str()); } Ok(()) } @@ -143,6 +153,23 @@ impl IncidentColorMap { } } +#[async_trait] +pub trait RSSFeed { + fn name(&self) -> &str; + fn url(&self) -> &str; + async fn process( + &self, + ctx: Arc + ) -> KonResult>; +} + +/// Handle feed's output type for Discord message +pub enum RSSFeedOutput { + RegularEmbed(CreateEmbed), + IncidentEmbed(CreateEmbed), + Content(String) +} + pub async fn rss(ctx: Arc) -> KonResult<()> { #[cfg(feature = "production")] let mut interval = interval(Duration::from_secs(300)); // Check feeds every 5 mins @@ -151,6 +178,19 @@ pub async fn rss(ctx: Arc) -> KonResult<()> { let mut first_run = true; task_info(TASK_NAME, "Task loaded!"); + let feeds: Vec = vec![ + Box::new(Esxi::new("https://esxi-patches.v-front.de/atom/ESXi-7.0.0.xml".to_string())), + Box::new(GitHub::new("https://www.githubstatus.com/history.atom".to_string())), + Box::new(GPortal::new("https://status.g-portal.com/history.atom".to_string())), + Box::new(RustBlog::new("https://blog.rust-lang.org/feed.xml".to_string())), + ]; + + let mut processor = processor::RSSProcessor::new(); + + for feed in feeds { + processor.add_feed(feed); + } + loop { interval.tick().await; @@ -158,6 +198,9 @@ pub async fn rss(ctx: Arc) -> KonResult<()> { task_info(&format!("{TASK_NAME}:Processor"), "Starting up!"); first_run = false; } - processor::feed_processor(&ctx).await; + + if let Err(e) = processor.process_all(ctx.clone()).await { + task_err(&format!("{TASK_NAME}:Processor"), &e.to_string()); + } } } diff --git a/tasks/src/rss/esxi.rs b/tasks/src/rss/esxi.rs index d7d60cf..c949fda 100644 --- a/tasks/src/rss/esxi.rs +++ b/tasks/src/rss/esxi.rs @@ -1,88 +1,115 @@ use super::{ - super::task_err, - REDIS_EXPIRY_SECS, + RSSFeed, + RSSFeedOutput, fetch_feed, format_href_to_discord, get_redis, parse, - save_to_redis + save_to_redis, + task_err }; use { kon_libs::KonResult, poise::serenity_prelude::{ + Context, CreateEmbed, CreateEmbedAuthor, - Timestamp + Timestamp, + async_trait }, regex::Regex, - std::io::Cursor + std::{ + io::Cursor, + sync::Arc + } }; -pub async fn esxi_embed() -> KonResult> { - let redis = get_redis().await; - let rkey = "RSS_ESXi"; - let url = "https://esxi-patches.v-front.de/atom/ESXi-7.0.0.xml"; +pub struct Esxi { + url: String +} - let res = fetch_feed(url).await?; - let data = res.text().await?; - let cursor = Cursor::new(data); +impl Esxi { + pub fn new(url: String) -> Self { Self { url } } +} - let feed = parse(cursor).unwrap(); - let home_page = feed.links[0].clone().href; - let article = feed.entries[0].clone(); +#[async_trait] +impl RSSFeed for Esxi { + fn name(&self) -> &str { "ESXi" } - fn get_patch_version(input: &str) -> Option { - let re = Regex::new(r#"(?i)Update\s+([0-9]+)([a-z]?)"#).unwrap(); + fn url(&self) -> &str { self.url.as_str() } - if let Some(caps) = re.captures(input) { - let update_num = caps[1].to_string(); - let letter = caps.get(2).map_or("", |m| m.as_str()); - Some(format!("Update {}{}", update_num, letter)) - } else { - None + async fn process( + &self, + _ctx: Arc + ) -> KonResult> { + let redis = get_redis().await; + let rkey = "RSS_ESXi"; + + let res = fetch_feed(self.url()).await?; + let data = res.text().await?; + let cursor = Cursor::new(data); + + let feed = parse(cursor).map_err(|e| { + task_err("RSS:ESXi", &format!("Error parsing RSS feed: {e}")); + e + })?; + + if feed.entries.is_empty() { + task_err("RSS:ESXi", "No entries found in the feed!"); + return Ok(None); } - } - let cached_patch = redis.get(rkey).await.unwrap().unwrap_or_default(); + let home_page = feed.links[0].clone().href; + let article = feed.entries[0].clone(); - if cached_patch.is_empty() { - redis.set(rkey, &article.categories[3].term).await.unwrap(); - if let Err(y) = redis.expire(rkey, REDIS_EXPIRY_SECS).await { - task_err("RSS", format!("[RedisExpiry]: {}", y).as_str()); + fn get_patch_version(input: &str) -> Option { + let re = Regex::new(r#"(?i)Update\s+([0-9]+)([a-z]?)"#).unwrap(); + + if let Some(caps) = re.captures(input) { + let update_num = caps[1].to_string(); + let letter = caps.get(2).map_or("", |m| m.as_str()); + Some(format!("Update {update_num}{letter}")) + } else { + None + } } - return Ok(None); - } - if let Some(patch) = get_patch_version(&article.categories[3].term) { - if patch == cached_patch { - Ok(None) - } else { + let cached_patch = redis.get(rkey).await.unwrap_or(None).unwrap_or_default(); + + if cached_patch.is_empty() { save_to_redis(rkey, &article.categories[3].term).await?; - Ok(Some( - CreateEmbed::new() - .color(0x4EFBCB) - .author(CreateEmbedAuthor::new(feed.title.unwrap().content).url(home_page)) - .thumbnail(feed.logo.unwrap().uri) - .description(format!( - "{} {} for {} {} has been rolled out!\n{}", - article.categories[2].term, - article.categories[3].term, - article.categories[0].term, - article.categories[1].term, - format_href_to_discord(article.summary.unwrap().content.as_str()) - )) - .timestamp(Timestamp::from(article.updated.unwrap())) - )) + return Ok(None); + } + + if let Some(patch) = get_patch_version(&article.categories[3].term) { + if patch == cached_patch { + Ok(None) + } else { + save_to_redis(rkey, &article.categories[3].term).await?; + + Ok(Some(RSSFeedOutput::RegularEmbed( + CreateEmbed::new() + .color(0x4EFBCB) + .author(CreateEmbedAuthor::new(feed.title.unwrap().content).url(home_page)) + .thumbnail(feed.logo.unwrap().uri) + .description(format!( + "{} {} for {} {} has been rolled out!\n{}", + article.categories[2].term, + article.categories[3].term, + article.categories[0].term, + article.categories[1].term, + format_href_to_discord(&article.summary.unwrap().content) + )) + .timestamp(Timestamp::from(article.updated.unwrap())) + ))) + } + } else { + task_err( + "RSS:ESXi", + &format!("Article term does not match the expected RegEx pattern! ({})", article.categories[3].term) + ); + Ok(None) } - } else { - task_err( - "RSS:ESXi", - &format!( - "Article term does not match the expected RegEx pattern! ({})", - article.categories[3].term.as_str() - ) - ); - Ok(None) } } diff --git a/tasks/src/rss/github.rs b/tasks/src/rss/github.rs index d72f1c2..7017ca1 100644 --- a/tasks/src/rss/github.rs +++ b/tasks/src/rss/github.rs @@ -1,111 +1,140 @@ use super::{ - super::task_err, IncidentColorMap, - REDIS_EXPIRY_SECS, + RSSFeed, + RSSFeedOutput, embed, fetch_feed, format_html_to_discord, get_redis, parse, save_to_redis, + task_err, trim_old_content }; use { kon_libs::KonResult, poise::serenity_prelude::{ - CreateEmbed, - Timestamp + Context, + Timestamp, + async_trait }, regex::Regex, - std::io::Cursor + std::{ + io::Cursor, + sync::Arc + } }; -pub async fn github_embed() -> KonResult> { - let redis = get_redis().await; - let rkey = "RSS_GitHub"; - let rkey_content = format!("{}_Content", rkey); - let url = "https://www.githubstatus.com/history.atom"; +pub struct GitHub { + url: String +} - let res = fetch_feed(url).await?; - let data = res.text().await?; - let cursor = Cursor::new(data); +impl GitHub { + pub fn new(url: String) -> Self { Self { url } } +} - let feed = parse(cursor).unwrap(); - let incident_page = feed.entries[0].links[0].clone().href; - let article = feed.entries[0].clone(); +#[async_trait] +impl RSSFeed for GitHub { + fn name(&self) -> &str { "GitHub" } - fn get_incident_id(input: &str) -> Option { - let re = Regex::new(r#"/incidents/([a-zA-Z0-9]+)$"#).unwrap(); + fn url(&self) -> &str { self.url.as_str() } - re.captures(input).map(|caps| caps[1].to_string()) - } + async fn process( + &self, + _ctx: Arc + ) -> KonResult> { + let redis = get_redis().await; + let rkey = "RSS_GitHub"; + let rkey_content = format!("{rkey}_Content"); - let cached_incident = redis.get(rkey).await.unwrap().unwrap_or_default(); - let new_content = format_html_to_discord(article.content.unwrap().body.unwrap()); + let res = fetch_feed(self.url()).await?; + let data = res.text().await?; + let cursor = Cursor::new(data); - let update_patt = Regex::new(r"(?i)\bupdate\b").unwrap(); - let investigating_patt = Regex::new(r"(?i)\binvestigating\b").unwrap(); - let resolved_patt = Regex::new(r"(?i)\bresolved\b").unwrap(); - let date_patt = Regex::new(r"\b[A-Z][a-z]{2} \d{2}, \d{2}:\d{2} UTC\b").unwrap(); + let feed = parse(cursor).map_err(|e| { + task_err("RSS:GitHub", &format!("Error parsing RSS feed: {e}")); + e + })?; - let first_entry = date_patt - .split(&new_content) - .map(str::trim) - .find(|e| !e.is_empty()) - .unwrap_or(&new_content); - - let color: u32 = if update_patt.is_match(first_entry) { - IncidentColorMap::Update.color() - } else if investigating_patt.is_match(first_entry) { - IncidentColorMap::Investigating.color() - } else if resolved_patt.is_match(first_entry) { - IncidentColorMap::Resolved.color() - } else { - IncidentColorMap::Default.color() - }; - - if cached_incident.is_empty() { - redis.set(rkey, &get_incident_id(&article.links[0].href).unwrap()).await.unwrap(); - redis.set(&rkey_content, &new_content).await.unwrap(); - if let Err(y) = redis.expire(rkey, REDIS_EXPIRY_SECS).await { - task_err("RSS", format!("[RedisExpiry]: {}", y).as_str()); + if feed.entries.is_empty() { + task_err("RSS:GitHub", "No entries found in the feed!"); + return Ok(None); } - return Ok(None); - } - if let Some(incident) = get_incident_id(&article.links[0].href) { - if incident == cached_incident { - let cached_content: String = redis.get(&rkey_content).await.unwrap().unwrap_or_default(); - if cached_content == new_content { - Ok(None) + let incident_page = feed.entries[0].links[0].clone().href; + let article = feed.entries[0].clone(); + + fn get_incident_id(input: &str) -> Option { + let re = Regex::new(r#"/incidents/([a-zA-Z0-9]+)$"#).unwrap(); + re.captures(input).map(|caps| caps[1].to_string()) + } + + let cached_incident = redis.get(rkey).await.unwrap().unwrap_or_default(); + let new_content = format_html_to_discord(article.content.unwrap().body.unwrap()); + + let update_patt = Regex::new(r"(?i)\bupdate\b").unwrap(); + let investigating_patt = Regex::new(r"(?i)\binvestigating\b").unwrap(); + let resolved_patt = Regex::new(r"(?i)\bresolved\b").unwrap(); + let date_patt = Regex::new(r"\b[A-Z][a-z]{2} \d{2}, \d{2}:\d{2} UTC\b").unwrap(); + + let first_entry = date_patt + .split(&new_content) + .map(str::trim) + .find(|e| !e.is_empty()) + .unwrap_or(&new_content); + + let color: u32 = if update_patt.is_match(first_entry) { + IncidentColorMap::Update.color() + } else if investigating_patt.is_match(first_entry) { + IncidentColorMap::Investigating.color() + } else if resolved_patt.is_match(first_entry) { + IncidentColorMap::Resolved.color() + } else { + IncidentColorMap::Default.color() + }; + + if cached_incident.is_empty() { + save_to_redis(rkey, &get_incident_id(&article.links[0].href).unwrap()).await?; + save_to_redis(&rkey_content, &new_content).await?; + return Ok(None); + } + + if let Some(incident) = get_incident_id(&article.links[0].href) { + if incident == cached_incident { + let cached_content = redis.get(&rkey_content).await.unwrap().unwrap_or_default(); + if cached_content == new_content { + Ok(None) + } else { + redis.set(&rkey_content, &new_content).await.unwrap(); + redis.expire(&rkey_content, 21600).await.unwrap(); + + Ok(Some(RSSFeedOutput::IncidentEmbed(embed( + color, + article.title.unwrap().content, + incident_page, + trim_old_content(&new_content), + Timestamp::from(article.updated.unwrap()) + )))) + } } else { + save_to_redis(rkey, &incident).await?; redis.set(&rkey_content, &new_content).await.unwrap(); - redis.expire(&rkey_content, 21600).await.unwrap(); - Ok(Some(embed( + + Ok(Some(RSSFeedOutput::IncidentEmbed(embed( color, article.title.unwrap().content, incident_page, trim_old_content(&new_content), Timestamp::from(article.updated.unwrap()) - ))) + )))) } } else { - save_to_redis(rkey, &incident).await?; - redis.set(&rkey_content, &new_content).await.unwrap(); - Ok(Some(embed( - color, - article.title.unwrap().content, - incident_page, - trim_old_content(&new_content), - Timestamp::from(article.updated.unwrap()) - ))) + task_err( + "RSS:GitHub", + &format!("Incident ID does not match the expected RegEx pattern! ({})", &article.links[0].href) + ); + Ok(None) } - } else { - task_err( - "RSS:GitHub", - &format!("Incident ID does not match the expected RegEx pattern! ({})", &article.links[0].href) - ); - Ok(None) } } diff --git a/tasks/src/rss/gportal.rs b/tasks/src/rss/gportal.rs index bfe7a59..999fbdb 100644 --- a/tasks/src/rss/gportal.rs +++ b/tasks/src/rss/gportal.rs @@ -1,114 +1,143 @@ use super::{ - super::task_err, IncidentColorMap, - REDIS_EXPIRY_SECS, + RSSFeed, + RSSFeedOutput, embed, fetch_feed, format_html_to_discord, get_redis, parse, save_to_redis, + task_err, trim_old_content }; use { kon_libs::KonResult, poise::serenity_prelude::{ - CreateEmbed, - Timestamp + Context, + Timestamp, + async_trait }, regex::Regex, - std::io::Cursor + std::{ + io::Cursor, + sync::Arc + } }; -pub async fn gportal_embed() -> KonResult> { - let redis = get_redis().await; - let rkey = "RSS_GPortal"; - let rkey_content = format!("{}_Content", rkey); - let url = "https://status.g-portal.com/history.atom"; +pub struct GPortal { + url: String +} - let res = fetch_feed(url).await?; - let data = res.text().await?; - let cursor = Cursor::new(data); +impl GPortal { + pub fn new(url: String) -> Self { Self { url } } +} - let feed = parse(cursor).unwrap(); - let incident_page = feed.links[0].clone().href; - let article = feed.entries[0].clone(); +#[async_trait] +impl RSSFeed for GPortal { + fn name(&self) -> &str { "GPortal" } - fn get_incident_id(input: &str) -> Option { - let re = Regex::new(r#"/incidents/([a-zA-Z0-9]+)$"#).unwrap(); + fn url(&self) -> &str { self.url.as_str() } - re.captures(input).map(|caps| caps[1].to_string()) - } + async fn process( + &self, + _ctx: Arc + ) -> KonResult> { + let redis = get_redis().await; + let rkey = "RSS_GPortal"; + let rkey_content = format!("{rkey}_Content"); - let cached_incident = redis.get(rkey).await.unwrap().unwrap_or_default(); - let new_content = format_html_to_discord(article.content.unwrap().body.unwrap()); + let res = fetch_feed(self.url()).await?; + let data = res.text().await?; + let cursor = Cursor::new(data); - let update_patt = Regex::new(r"(?i)\bupdate\b").unwrap(); - let investigating_patt = Regex::new(r"(?i)\binvestigating\b").unwrap(); - let monitoring_patt = Regex::new(r"(?i)\bmonitoring\b").unwrap(); - let resolved_patt = Regex::new(r"(?i)\bresolved\b").unwrap(); - let date_patt = Regex::new(r"\b[A-Z][a-z]{2} \d{2}, \d{2}:\d{2} UTC\b").unwrap(); + let feed = parse(cursor).map_err(|e| { + task_err("RSS:GPortal", &format!("Error parsing RSS feed: {e}")); + e + })?; - let first_entry = date_patt - .split(&new_content) - .map(str::trim) - .find(|e| !e.is_empty()) - .unwrap_or(&new_content); - - let color: u32 = if update_patt.is_match(first_entry) { - IncidentColorMap::Update.color() - } else if investigating_patt.is_match(first_entry) { - IncidentColorMap::Investigating.color() - } else if monitoring_patt.is_match(first_entry) { - IncidentColorMap::Monitoring.color() - } else if resolved_patt.is_match(first_entry) { - IncidentColorMap::Resolved.color() - } else { - IncidentColorMap::Default.color() - }; - - if cached_incident.is_empty() { - redis.set(rkey, &get_incident_id(&article.links[0].href).unwrap()).await.unwrap(); - redis.set(&rkey_content, &new_content).await.unwrap(); - if let Err(y) = redis.expire(rkey, REDIS_EXPIRY_SECS).await { - task_err("RSS", format!("[RedisExpiry]: {}", y).as_str()); + if feed.entries.is_empty() { + task_err("RSS:GPortal", "No entries found in the feed!"); + return Ok(None); } - return Ok(None); - } - if let Some(incident) = get_incident_id(&article.links[0].href) { - if incident == cached_incident { - let cached_content: String = redis.get(&rkey_content).await.unwrap().unwrap_or_default(); - if cached_content == new_content { - Ok(None) + let incident_page = feed.links[0].clone().href; + let article = feed.entries[0].clone(); + + fn get_incident_id(input: &str) -> Option { + let re = Regex::new(r#"/incidents/([a-zA-Z0-9]+)$"#).unwrap(); + re.captures(input).map(|caps| caps[1].to_string()) + } + + let cached_incident = redis.get(rkey).await.unwrap().unwrap_or_default(); + let new_content = format_html_to_discord(article.content.unwrap().body.unwrap()); + + let update_patt = Regex::new(r"(?i)\bupdate\b").unwrap(); + let investigating_patt = Regex::new(r"(?i)\binvestigating\b").unwrap(); + let monitoring_patt = Regex::new(r"(?i)\bmonitoring\b").unwrap(); + let resolved_patt = Regex::new(r"(?i)\bresolved\b").unwrap(); + let date_patt = Regex::new(r"\b[A-Z][a-z]{2} \d{2}, \d{2}:\d{2} UTC\b").unwrap(); + + let first_entry = date_patt + .split(&new_content) + .map(str::trim) + .find(|e| !e.is_empty()) + .unwrap_or(&new_content); + + let color: u32 = if update_patt.is_match(first_entry) { + IncidentColorMap::Update.color() + } else if investigating_patt.is_match(first_entry) { + IncidentColorMap::Investigating.color() + } else if monitoring_patt.is_match(first_entry) { + IncidentColorMap::Monitoring.color() + } else if resolved_patt.is_match(first_entry) { + IncidentColorMap::Resolved.color() + } else { + IncidentColorMap::Default.color() + }; + + if cached_incident.is_empty() { + save_to_redis(rkey, &get_incident_id(&article.links[0].href).unwrap()).await?; + save_to_redis(&rkey_content, &new_content).await?; + return Ok(None); + } + + if let Some(incident) = get_incident_id(&article.links[0].href) { + if incident == cached_incident { + let cached_content = redis.get(&rkey_content).await.unwrap().unwrap_or_default(); + if cached_content == new_content { + Ok(None) + } else { + redis.set(&rkey_content, &new_content).await.unwrap(); + redis.expire(&rkey_content, 21600).await.unwrap(); + + Ok(Some(RSSFeedOutput::IncidentEmbed(embed( + color, + article.title.unwrap().content, + incident_page, + trim_old_content(&new_content), + Timestamp::from(article.updated.unwrap()) + )))) + } } else { + save_to_redis(rkey, &incident).await?; redis.set(&rkey_content, &new_content).await.unwrap(); - redis.expire(&rkey_content, 21600).await.unwrap(); - Ok(Some(embed( + + Ok(Some(RSSFeedOutput::IncidentEmbed(embed( color, article.title.unwrap().content, incident_page, trim_old_content(&new_content), Timestamp::from(article.updated.unwrap()) - ))) + )))) } } else { - save_to_redis(rkey, &incident).await?; - redis.set(&rkey_content, &new_content).await.unwrap(); - Ok(Some(embed( - color, - article.title.unwrap().content, - incident_page, - trim_old_content(&new_content), - Timestamp::from(article.updated.unwrap()) - ))) + task_err( + "RSS:GPortal", + &format!("Incident ID does not match the expected RegEx pattern! ({})", &article.links[0].href) + ); + Ok(None) } - } else { - task_err( - "RSS:GPortal", - &format!("Incident ID does not match the expected RegEx pattern! ({})", &article.links[0].href) - ); - Ok(None) } } diff --git a/tasks/src/rss/processor.rs b/tasks/src/rss/processor.rs index 87e0bfb..1dfaf46 100644 --- a/tasks/src/rss/processor.rs +++ b/tasks/src/rss/processor.rs @@ -4,13 +4,10 @@ use kon_libs::{ }; use super::{ + RSSFeedBox, + RSSFeedOutput, TASK_NAME, - esxi::esxi_embed, - get_redis, - github::github_embed, - gportal::gportal_embed, - rust::rust_message, - task_err + get_redis }; use { @@ -19,9 +16,11 @@ use { Context, CreateEmbed, CreateMessage, - EditMessage + EditMessage, + Http }, regex::Regex, + std::sync::Arc, tokio::time::{ Duration, sleep @@ -32,29 +31,52 @@ use { /* std::fs::File::create("rss_name.log").unwrap(); std::fs::write("rss_name.log", format!("{:#?}", feed))?; */ -// todo; have a reusable function for feeding RSS data and building the embed out of it. -// see github.rs / esxi.rs / gportal.rs for references of this idea. +async fn process_regular_embed( + http: &Http, + embed: CreateEmbed, + redis_key: &str +) -> KonResult<()> { + let redis = get_redis().await; + let channel = ChannelId::new(BINARY_PROPERTIES.rss_channel); -async fn process_embed( - ctx: &Context, - embed: Option, + let msg_id_key: Option = redis.get(redis_key).await?; + + if let Some(msg_id_key) = msg_id_key { + if let Ok(msg_id) = msg_id_key.parse::() { + if let Ok(mut message) = channel.message(http, msg_id).await { + message.edit(http, EditMessage::new().embed(embed)).await?; + } + } + } else { + let message = channel.send_message(http, CreateMessage::new().add_embed(embed)).await?; + redis.set(redis_key, &message.id.to_string()).await?; + redis.expire(redis_key, 36000).await?; + } + + Ok(()) +} + +/// Cache-based embed updater for ongoing outages/incidents +async fn process_incident_embed( + http: &Http, + embed: CreateEmbed, redis_key: &str, content_key: &str ) -> KonResult<()> { - if let Some(embed) = embed { - let redis = get_redis().await; - let channel = ChannelId::new(BINARY_PROPERTIES.rss_channel); + let redis = get_redis().await; + let channel = ChannelId::new(BINARY_PROPERTIES.rss_channel); - let msg_id_key: Option = redis.get(redis_key).await?; - let cached_content: Option = redis.get(content_key).await.unwrap_or(None); + let msg_id_key: Option = redis.get(redis_key).await?; + let cached_content: Option = redis.get(content_key).await.unwrap_or(None); - if let Some(msg_id_key) = msg_id_key { - if let Ok(msg_id) = msg_id_key.parse::() { - if let Ok(mut message) = channel.message(&ctx.http, msg_id).await { - let new_description = message.embeds[0].description.clone().unwrap(); + if let Some(msg_id_key) = msg_id_key { + if let Ok(msg_id) = msg_id_key.parse::() { + if let Ok(mut message) = channel.message(http, msg_id).await { + if let Some(existing) = message.embeds.first() { + let new_description = existing.description.clone().unwrap(); if cached_content.as_deref() != Some(&new_description) { - message.edit(&ctx.http, EditMessage::new().embed(embed)).await?; + message.edit(http, EditMessage::new().embed(embed)).await?; } sleep(Duration::from_secs(15)).await; @@ -64,81 +86,94 @@ async fn process_embed( } } } - } else { - let message = channel.send_message(&ctx.http, CreateMessage::new().add_embed(embed)).await?; - redis.set(redis_key, &message.id.to_string()).await?; - redis.expire(redis_key, 36000).await?; } + } else { + let message = channel.send_message(http, CreateMessage::new().add_embed(embed)).await?; + redis.set(redis_key, &message.id.to_string()).await?; + redis.expire(redis_key, 36000).await?; } Ok(()) } -pub async fn feed_processor(ctx: &Context) { - let mut log_msgs: Vec = Vec::new(); +/// Process the content string +async fn process_msg_content( + http: &Http, + content: String, + redis_key: &str +) -> KonResult<()> { + let redis = get_redis().await; + let channel = ChannelId::new(BINARY_PROPERTIES.rss_channel); - match esxi_embed().await { - Ok(Some(embed)) => { - ChannelId::new(BINARY_PROPERTIES.rss_channel) - .send_message(&ctx.http, CreateMessage::new().add_embed(embed)) - .await - .unwrap(); - }, - Ok(None) => (), - Err(y) => { - log_msgs.push(format!( - "**[{TASK_NAME}:ESXi:Error]:** Feed failed with the following error:```\n{}\n```", - y - )); - task_err(TASK_NAME, &y.to_string()) + let msg_id_key: Option = redis.get(redis_key).await?; + + if let Some(msg_id_key) = msg_id_key { + if let Ok(msg_id) = msg_id_key.parse::() { + channel.edit_message(http, msg_id, EditMessage::new().content(content)).await?; } + } else { + let message = channel.send_message(http, CreateMessage::new().content(content)).await?; + redis.set(redis_key, &message.id.to_string()).await?; + redis.expire(redis_key, 36000).await?; } - match gportal_embed().await { - Ok(Some(embed)) => process_embed(ctx, Some(embed), "RSS_GPortal_MsgID", "RSS_GPortal_Content").await.unwrap(), - Ok(None) => (), - Err(y) => { - log_msgs.push(format!( - "**[{TASK_NAME}:GPortal:Error]:** Feed failed with the following error:```\n{}\n```", - y - )); - task_err(TASK_NAME, &y.to_string()) - } + Ok(()) +} + +pub struct RSSProcessor { + pub feeds: Vec +} + +impl RSSProcessor { + pub fn new() -> Self { Self { feeds: Vec::new() } } + + pub fn add_feed( + &mut self, + feed: RSSFeedBox + ) { + self.feeds.push(feed); } - match github_embed().await { - Ok(Some(embed)) => process_embed(ctx, Some(embed), "RSS_GitHub_MsgID", "RSS_GitHub_Content").await.unwrap(), - Ok(None) => (), - Err(y) => { - log_msgs.push(format!( - "**[{TASK_NAME}:GitHub:Error]:** Feed failed with the following error:```\n{}\n```", - y - )); - task_err(TASK_NAME, &y.to_string()) - } - } + pub async fn process_all( + &self, + ctx: Arc + ) -> KonResult<()> { + let mut discord_msg: Vec = Vec::new(); - match rust_message().await { - Ok(Some(content)) => { - ChannelId::new(BINARY_PROPERTIES.rss_channel) - .send_message(&ctx.http, CreateMessage::new().content(content)) - .await - .unwrap(); - }, - Ok(None) => (), - Err(y) => { - log_msgs.push(format!( - "**[{TASK_NAME}:RustBlog:Error]:** Feed failed with the following error:```\n{}\n```", - y - )); - task_err(TASK_NAME, &y.to_string()) - } - } + for feed in &self.feeds { + let feed_name = feed.name(); + let redis_key = format!("RSS_{feed_name}_MsgId"); + let error_msg = format!("**[{TASK_NAME}:{feed_name}:Error]:** Feed failed with the following error:```\n{{ error }}\n```"); - if !log_msgs.is_empty() { - ChannelId::new(BINARY_PROPERTIES.kon_logs) - .send_message(&ctx.http, CreateMessage::new().content(log_msgs.join("\n"))) - .await - .unwrap(); + match feed.process(ctx.clone()).await { + Ok(Some(output)) => match output { + RSSFeedOutput::RegularEmbed(embed) => { + if let Err(e) = process_regular_embed(&ctx.http, embed, &redis_key).await { + discord_msg.push(error_msg.replace("{{ error }}", &e.to_string())) + } + }, + RSSFeedOutput::IncidentEmbed(embed) => { + if let Err(e) = process_incident_embed(&ctx.http, embed, &redis_key, &format!("RSS_{feed_name}_Content")).await { + discord_msg.push(error_msg.replace("{{ error }}", &e.to_string())) + } + }, + RSSFeedOutput::Content(content) => { + if let Err(e) = process_msg_content(&ctx.http, content, &redis_key).await { + discord_msg.push(error_msg.replace("{{ error }}", &e.to_string())) + } + }, + }, + Ok(None) => (), + Err(e) => discord_msg.push(error_msg.replace("{{ error }}", &e.to_string())) + } + } + + if !discord_msg.is_empty() { + ChannelId::new(BINARY_PROPERTIES.kon_logs) + .send_message(&ctx.http, CreateMessage::new().content(discord_msg.join("\n"))) + .await?; + } + + Ok(()) } } diff --git a/tasks/src/rss/rust.rs b/tasks/src/rss/rust.rs index bddb381..60864af 100644 --- a/tasks/src/rss/rust.rs +++ b/tasks/src/rss/rust.rs @@ -1,5 +1,6 @@ use super::{ - REDIS_EXPIRY_SECS, + RSSFeed, + RSSFeedOutput, fetch_feed, get_redis, parse, @@ -9,54 +10,85 @@ use super::{ use { kon_libs::KonResult, + poise::serenity_prelude::{ + Context, + async_trait + }, regex::Regex, - std::io::Cursor + std::{ + io::Cursor, + sync::Arc + } }; -pub async fn rust_message() -> KonResult> { - let redis = get_redis().await; - let rkey = "RSS_RustBlog"; - let url = "https://blog.rust-lang.org/feed.xml"; +pub struct RustBlog { + url: String +} - let res = fetch_feed(url).await?; - let data = res.text().await?; - let cursor = Cursor::new(data); +impl RustBlog { + pub fn new(url: String) -> Self { Self { url } } +} - let feed = parse(cursor).unwrap(); - let article = feed.entries[0].clone(); - let article_id = article.id.clone(); +#[async_trait] +impl RSSFeed for RustBlog { + fn name(&self) -> &str { "RustBlog" } - fn get_blog_title(input: String) -> Option { - let re = Regex::new(r"https://blog\.rust-lang\.org/(\d{4}/\d{2}/\d{2}/[^/]+)").unwrap(); - re.captures(input.as_str()).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string())) - } + fn url(&self) -> &str { self.url.as_str() } - let cached_blog = redis.get(rkey).await.unwrap().unwrap_or_default(); + async fn process( + &self, + _ctx: Arc + ) -> KonResult> { + let redis = get_redis().await; + let rkey = "RSS_RustBlog"; - if cached_blog.is_empty() { - redis.set(rkey, get_blog_title(article.id).unwrap().as_str()).await.unwrap(); - if let Err(y) = redis.expire(rkey, REDIS_EXPIRY_SECS).await { - task_err("RSS", format!("[RedisExpiry]: {}", y).as_str()); + let res = fetch_feed(self.url()).await?; + let data = res.text().await?; + let cursor = Cursor::new(data); + + let feed = parse(cursor).map_err(|e| { + task_err("RSS:RustBlog", &format!("Error parsing RSS feed: {e}")); + e + })?; + + if feed.entries.is_empty() { + task_err("RSS:RustBlog", "No entries found in the feed!"); + return Ok(None); } - return Ok(None); - } - if let Some(blog) = get_blog_title(article.id) { - if blog == cached_blog { - Ok(None) + let article = feed.entries[0].clone(); + let article_id = article.id.clone(); + + fn get_blog_title(input: String) -> Option { + let re = Regex::new(r"https://blog\.rust-lang\.org/(\d{4}/\d{2}/\d{2}/[^/]+)").unwrap(); + re.captures(input.as_str()).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string())) + } + + let cached_blog = redis.get(rkey).await.unwrap_or(None).unwrap_or_default(); + + if cached_blog.is_empty() { + save_to_redis(rkey, &get_blog_title(article.id).unwrap()).await?; + return Ok(None); + } + + if let Some(blog_title) = get_blog_title(article.id) { + if blog_title == cached_blog { + Ok(None) + } else { + save_to_redis(rkey, &blog_title).await?; + + Ok(Some(RSSFeedOutput::Content(format!( + "Rust Team has put out a new article!\n**[{}](<{}>)**", + article.links[0].title.clone().unwrap(), + article.links[0].href + )))) + } } else { - save_to_redis(rkey, &blog).await?; - Ok(Some(format!( - "Rust Team has put out a new article!\n**[{}](<{}>)**", - article.links[0].title.clone().unwrap(), - article.links[0].href - ))) + task_err( + "RSS:RustBlog", + &format!("Article URL does not match the expected RegEx pattern! ({article_id})") + ); + Ok(None) } - } else { - task_err( - "RSS:RustBlog", - &format!("Article URL does not match the expected RegEx pattern! ({})", article_id) - ); - Ok(None) } }