Pattern Overview
External monitoring watches outside sources for changes, processes them with AI, and distributes insights.
The Flow
Monitor External Source (RSS, APIs, websites)
↓
Detect Changes (new articles, updates)
↓
Process with AI (summarize, extract insights)
↓
Distribute Results (Slack, email, dashboard)
Common Use Cases
- Content: Competitor blogs → Summarize → Share with team
- Data: API changes → Document → Notify developers
- Social: Brand mentions → Sentiment analysis → Alert PR
- Regulatory: Policy changes → Analyze → Compliance alerts
Example: RSS Summarizer Agent
The Problem
Your team monitors 20+ industry blogs manually (2-3 hours/day). You need automated monitoring, summarization, and distribution.
Architecture
Scheduled Trigger (hourly)
↓
RSS Poller (fetch feeds)
↓
Change Detection (find new articles)
↓
Summarizer Agent (AI summaries)
↓
Distribution (Slack + Email)
Setup
Dependencies
npm install @mastra/core @ai-sdk/openai zod npm install rss-parser @slack/web-api resend npm install prisma @prisma/client
Configuration
// feeds.config.ts export const feeds = [ { id: 'techcrunch', name: 'TechCrunch', url: 'https://techcrunch.com/feed/', priority: 'high', }, { id: 'verge', name: 'The Verge', url: 'https://www.theverge.com/rss/index.xml', priority: 'medium', }, ];
RSS Polling
// lib/rss-poller.ts import Parser from 'rss-parser'; export class RSSPoller { private parser = new Parser(); async fetchFeed(url: string) { const feed = await this.parser.parseURL(url); return feed.items.map(item => ({ guid: item.guid || item.link, title: item.title, link: item.link, pubDate: item.pubDate, content: item.content || item.contentSnippet, })); } async pollAll(feeds: FeedConfig[]) { const results = await Promise.allSettled( feeds.map(async (feed) => { const articles = await this.fetchFeed(feed.url); return { feedId: feed.id, articles }; }) ); return results .filter(r => r.status === 'fulfilled') .map(r => r.value); } }
Change Detection
// lib/change-detector.ts import { db } from './db'; export class ChangeDetector { async detectNew(feedId: string, articles: Article[]) { const existing = await db.processedArticle.findMany({ where: { feedId }, select: { guid: true }, }); const existingGuids = new Set(existing.map(a => a.guid)); return articles.filter(a => !existingGuids.has(a.guid)); } async markProcessed(feedId: string, articles: Article[]) { await db.processedArticle.createMany({ data: articles.map(a => ({ feedId, guid: a.guid, title: a.title, processedAt: new Date(), })), skipDuplicates: true, }); } }
Content Processing
Summarizer Agent
// src/mastra/agents/summarizer.ts import { Agent } from '@mastra/core'; import { z } from 'zod'; const SummarySchema = z.object({ summary: z.string(), keyPoints: z.array(z.string()), relevance: z.enum(['high', 'medium', 'low']), category: z.string(), actionable: z.boolean(), }); export const summarizerAgent = new Agent({ name: 'Article Summarizer', instructions: `Analyze articles and provide concise summaries. Focus on: - Key takeaways (2-3 sentences) - 3-5 bullet points - Relevance to our team - Actionable insights`, model: { provider: 'OPEN_AI', name: 'gpt-4o-mini', }, }); export async function summarizeArticle(article: Article) { const result = await summarizerAgent.generate( `Analyze: ${article.title}\n\n${article.content}`, { output: SummarySchema } ); return result.object; }
Batch Processing
// lib/batch-processor.ts export async function processBatch(articles: Article[], batchSize = 5) { const results = []; for (let i = 0; i < articles.length; i += batchSize) { const batch = articles.slice(i, i + batchSize); const batchResults = await Promise.all( batch.map(article => summarizeArticle(article)) ); results.push(...batchResults); // Rate limiting await new Promise(r => setTimeout(r, 1000)); } return results; }
Distribution
Slack Integration
// src/mastra/tools/post-slack.ts import { WebClient } from '@slack/web-api'; const slack = new WebClient(process.env.SLACK_BOT_TOKEN); export async function postToSlack(summary: ArticleSummary, channel: string) { const { article, summary: content } = summary; await slack.chat.postMessage({ channel, text: article.title, blocks: [ { type: 'header', text: { type: 'plain_text', text: article.title }, }, { type: 'section', text: { type: 'mrkdwn', text: `*Summary*\n${content.summary}` }, }, { type: 'section', text: { type: 'mrkdwn', text: `*Key Points*\n${content.keyPoints.map(p => `• ${p}`).join('\n')}`, }, }, { type: 'actions', elements: [{ type: 'button', text: { type: 'plain_text', text: 'Read Article' }, url: article.link, }], }, ], }); }
Email Digest
// lib/email-digest.ts import { Resend } from 'resend'; const resend = new Resend(process.env.RESEND_API_KEY); export async function sendDigest(summaries: ArticleSummary[], to: string[]) { const html = ` <h1>Daily Feed Digest</h1> ${summaries.map(s => ` <div style="margin: 20px 0; padding: 15px; border-left: 4px solid #3b82f6;"> <h2>${s.article.title}</h2> <p>${s.summary.summary}</p> <ul> ${s.summary.keyPoints.map(p => `<li>${p}</li>`).join('')} </ul> <a href="${s.article.link}">Read Full Article</a> </div> `).join('')} `; await resend.emails.send({ from: 'digest@yourcompany.com', to, subject: `Daily Feed Digest - ${new Date().toLocaleDateString()}`, html, }); }
State Management
Database Schema
model ProcessedArticle { id String @id @default(cuid()) feedId String guid String @unique title String link String processedAt DateTime @default(now()) @@index([feedId]) } model ArticleSummary { id String @id @default(cuid()) articleId String summary Json @@index([articleId]) }
Complete Pipeline
// app/api/cron/poll-feeds/route.ts import { NextRequest, NextResponse } from 'next/server'; import { RSSPoller } from '@/lib/rss-poller'; import { ChangeDetector } from '@/lib/change-detector'; import { summarizeArticle } from '@/mastra/agents/summarizer'; import { postToSlack } from '@/mastra/tools/post-slack'; import { feeds } from '@/feeds.config'; export async function GET(request: NextRequest) { // Verify cron secret if (request.headers.get('authorization') !== `Bearer ${process.env.CRON_SECRET}`) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); } const poller = new RSSPoller(); const detector = new ChangeDetector(); // Poll all feeds const results = await poller.pollAll(feeds); let totalProcessed = 0; for (const result of results) { // Find new articles const newArticles = await detector.detectNew(result.feedId, result.articles); if (newArticles.length === 0) continue; console.log(`${result.feedId}: ${newArticles.length} new articles`); // Summarize each article for (const article of newArticles) { try { const summary = await summarizeArticle(article); // Post to Slack if relevant if (summary.relevance === 'high' || summary.actionable) { await postToSlack( { article, summary }, process.env.SLACK_CHANNEL_ID! ); } totalProcessed++; } catch (error) { console.error(`Failed to process ${article.title}:`, error); } } // Mark as processed await detector.markProcessed(result.feedId, newArticles); } return NextResponse.json({ success: true, processed: totalProcessed, }); }
Vercel Cron Setup
// vercel.json { "crons": [ { "path": "/api/cron/poll-feeds", "schedule": "0 * * * *" } ] }
Key Takeaways
- External monitoring - Watch sources, detect changes, process, distribute
- Change detection - Track processed items to avoid duplicates
- Batch processing - Handle rate limits, process efficiently
- Rich distribution - Slack blocks, email digests, filtered by relevance
- State management - Database tracking, deduplication, metadata
Pattern 2 complete! Next: Pattern 3 (Event Preparation).
Get chapter updates & code samples
We’ll email diagrams, code snippets, and additions.