Introduction
In today’s information-rich world, staying on top of research in any field can be overwhelming. This guide will walk you through creating your own AI-driven research machine that can automatically gather, process, and summarize information on topics of interest. By the end, you’ll have a powerful tool that saves time while keeping you informed about the latest developments in your field.
Prerequisites
- Basic programming knowledge (Python)
- Familiarity with command line interfaces
- A computer with internet access
- Basic understanding of APIs
System Architecture Overview
Our AI research machine consists of five main components:
- Data Collection Module: Gathers information from various sources
- Processing Pipeline: Cleans and structures the collected data
- Analysis Engine: Extracts insights and organizes information
- Storage System: Maintains a knowledge database of findings
- User Interface: Provides access to research results
Let’s build each component step by step.
Part 1: Setting Up Your Environment
Step 1: Install Required Software
# Create a virtual environment
python -m venv ai_research_env
source ai_research_env/bin/activate # On Windows: ai_research_env\Scripts\activate
# Install core dependencies
pip install requests beautifulsoup4 nltk scikit-learn pandas numpy
pip install openai langchain chromadb pdfplumber arxiv pymupdf
pip install flask python-dotenv schedule
Step 2: Create Project Structure
mkdir -p ai_research_machine/{data_collection,processing,analysis,storage,ui}
touch ai_research_machine/__init__.py
touch ai_research_machine/{data_collection,processing,analysis,storage,ui}/__init__.py
Step 3: Configure API Keys
Create a .env
file in your project root:
OPENAI_API_KEY=your_openai_api_key
[email protected]
NEWS_API_KEY=your_newsapi_key
GOOGLE_CSE_ID=your_google_custom_search_engine_id
GOOGLE_API_KEY=your_google_api_key
Part 2: Building the Data Collection Module
Step 1: Define Source Types
Create data_collection/sources.py
:
from abc import ABC, abstractmethod
import requests
import arxiv
import os
from bs4 import BeautifulSoup
from dotenv import load_dotenv
load_dotenv()
class DataSource(ABC):
@abstractmethod
def fetch(self, query, limit=10):
pass
class ArxivSource(DataSource):
def fetch(self, query, limit=10):
client = arxiv.Client(page_size=limit,
delay_seconds=3,
num_retries=3)
search = arxiv.Search(
query=query,
max_results=limit,
sort_by=arxiv.SortCriterion.SubmittedDate
)
results = list(client.results(search))
return [{
'title': paper.title,
'authors': [author.name for author in paper.authors],
'summary': paper.summary,
'published': paper.published,
'url': paper.pdf_url,
'source_type': 'arxiv',
'content_type': 'pdf'
} for paper in results]
class NewsAPISource(DataSource):
def fetch(self, query, limit=10):
api_key = os.getenv('NEWS_API_KEY')
url = f"https://newsapi.org/v2/everything?q={query}&apiKey={api_key}&pageSize={limit}"
response = requests.get(url)
if response.status_code != 200:
return []
data = response.json()
articles = data.get('articles', [])
return [{
'title': article.get('title'),
'authors': [article.get('author')] if article.get('author') else [],
'summary': article.get('description'),
'published': article.get('publishedAt'),
'url': article.get('url'),
'source_type': 'news',
'content_type': 'web'
} for article in articles[:limit]]
class WebpageSource(DataSource):
def fetch(self, query, limit=10):
api_key = os.getenv('GOOGLE_API_KEY')
cse_id = os.getenv('GOOGLE_CSE_ID')
url = f"https://www.googleapis.com/customsearch/v1?key={api_key}&cx={cse_id}&q={query}"
response = requests.get(url)
if response.status_code != 200:
return []
data = response.json()
items = data.get('items', [])
results = []
for item in items[:limit]:
try:
page_response = requests.get(item.get('link'), timeout=5)
if page_response.status_code == 200:
soup = BeautifulSoup(page_response.text, 'html.parser')
# Extract main content (simplified)
content = soup.find('body').get_text(separator=' ', strip=True)
results.append({
'title': item.get('title'),
'authors': [], # Often not available
'summary': item.get('snippet'),
'published': None, # Often not available
'url': item.get('link'),
'source_type': 'web',
'content_type': 'web',
'content': content[:5000] # Truncate for practical purposes
})
except Exception as e:
print(f"Error fetching {item.get('link')}: {e}")
return results
Step 2: Create Data Collector
Create data_collection/collector.py
:
from .sources import ArxivSource, NewsAPISource, WebpageSource
import threading
from queue import Queue
class DataCollector:
def __init__(self):
self.sources = {
'arxiv': ArxivSource(),
'news': NewsAPISource(),
'web': WebpageSource()
}
def collect(self, query, sources=None, limit_per_source=10):
"""
Collect data from specified sources in parallel
"""
if sources is None:
sources = list(self.sources.keys())
result_queue = Queue()
threads = []
def fetch_from_source(source_name):
try:
source = self.sources.get(source_name)
if source:
results = source.fetch(query, limit_per_source)
result_queue.put((source_name, results))
else:
result_queue.put((source_name, []))
except Exception as e:
print(f"Error in {source_name} source: {e}")
result_queue.put((source_name, []))
# Start a thread for each source
for source_name in sources:
thread = threading.Thread(target=fetch_from_source, args=(source_name,))
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
# Collect results
all_results = []
while not result_queue.empty():
source_name, results = result_queue.get()
all_results.extend(results)
return all_results
Part 3: Building the Processing Pipeline
Step 1: Document Processor
Create processing/document_processor.py
:
import pdfplumber
import requests
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize
import re
# Download required NLTK data
nltk.download('punkt', quiet=True)
class DocumentProcessor:
def __init__(self):
pass
def process_document(self, document):
"""Process a document based on its content type"""
content_type = document.get('content_type')
if content_type == 'pdf' and 'content' not in document:
document['content'] = self._extract_pdf_content(document['url'])
if content_type == 'web' and 'content' not in document:
document['content'] = self._extract_web_content(document['url'])
# Clean and normalize text
if 'content' in document:
document['content'] = self._clean_text(document['content'])
document['sentences'] = sent_tokenize(document['content'])
return document
def _extract_pdf_content(self, url):
"""Extract text from a PDF file"""
try:
response = requests.get(url)
with open('temp.pdf', 'wb') as f:
f.write(response.content)
text = ""
with pdfplumber.open('temp.pdf') as pdf:
for page in pdf.pages:
text += page.extract_text() or ""
import os
os.remove('temp.pdf')
return text
except Exception as e:
print(f"Error extracting PDF content: {e}")
return ""
def _extract_web_content(self, url):
"""Extract content from a web page"""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
# Get text
text = soup.get_text(separator=' ', strip=True)
return text
except Exception as e:
print(f"Error extracting web content: {e}")
return ""
def _clean_text(self, text):
"""Clean and normalize text"""
# Replace newlines and multiple spaces
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s.,;:!?"\'-]', '', text)
return text.strip()
Step 2: Content Extraction Pipeline
Create processing/pipeline.py
:
from .document_processor import DocumentProcessor
from concurrent.futures import ThreadPoolExecutor
class ProcessingPipeline:
def __init__(self, max_workers=4):
self.document_processor = DocumentProcessor()
self.max_workers = max_workers
def process_batch(self, documents):
"""Process a batch of documents concurrently"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
processed_docs = list(executor.map(self.document_processor.process_document, documents))
return processed_docs
Part 4: Building the Analysis Engine
Step 1: Create Text Analysis Tools
Create analysis/text_analyzer.py
:
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords
import nltk
import numpy as np
import openai
import os
from dotenv import load_dotenv
# Download required NLTK data
nltk.download('stopwords', quiet=True)
load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')
class TextAnalyzer:
def __init__(self):
self.stopwords = set(stopwords.words('english'))
def extract_keywords(self, text, top_n=10):
"""Extract most important keywords using TF-IDF"""
vectorizer = TfidfVectorizer(
max_df=0.85,
min_df=2,
max_features=200,
stop_words=self.stopwords
)
# Handle single document case
if isinstance(text, str):
tfidf_matrix = vectorizer.fit_transform()
else:
tfidf_matrix = vectorizer.fit_transform(text)
feature_names = vectorizer.get_feature_names_out()
if isinstance(text, str):
tfidf_scores = tfidf_matrix.toarray()[0]
else:
tfidf_scores = np.mean(tfidf_matrix.toarray(), axis=0)
# Sort by score
sorted_indices = np.argsort(tfidf_scores)[::-1]
top_keywords = [feature_names[i] for i in sorted_indices[:top_n]]
return top_keywords
def summarize_text(self, text, max_length=150):
"""Summarize text using OpenAI API"""
if len(text) < max_length:
return text
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a research assistant. Summarize the following text concisely."},
{"role": "user", "content": text}
],
max_tokens=max_length,
temperature=0.3
)
return response.choices[0].message['content'].strip()
except Exception as e:
print(f"Error summarizing text: {e}")
# Fallback to simple extractive summarization
sentences = nltk.sent_tokenize(text)
return " ".join(sentences[:3])
Step 2: Create Topic Modeling
Create analysis/topic_modeler.py
:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
import numpy as np
class TopicModeler:
def __init__(self, n_topics=5, n_top_words=10):
self.n_topics = n_topics
self.n_top_words = n_top_words
self.vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words='english')
self.lda = LatentDirichletAllocation(
n_components=n_topics,
random_state=42,
learning_method='online'
)
def extract_topics(self, documents):
"""Extract topics from a collection of documents"""
# Extract document content
doc_texts = [doc.get('content', '') for doc in documents if doc.get('content')]
if not doc_texts:
return []
# Create document-term matrix
dtm = self.vectorizer.fit_transform(doc_texts)
# Fit LDA model
self.lda.fit(dtm)
# Extract feature names
feature_names = self.vectorizer.get_feature_names_out()
# Extract topics
topics = []
for topic_idx, topic in enumerate(self.lda.components_):
top_words_idx = topic.argsort()[:-self.n_top_words - 1:-1]
top_words = [feature_names[i] for i in top_words_idx]
topics.append({
'id': topic_idx,
'words': top_words,
'weight': float(np.mean(topic))
})
return topics
Step 3: Create Research Analyzer
Create analysis/research_analyzer.py
:
from .text_analyzer import TextAnalyzer
from .topic_modeler import TopicModeler
import openai
import os
class ResearchAnalyzer:
def __init__(self):
self.text_analyzer = TextAnalyzer()
self.topic_modeler = TopicModeler()
def analyze(self, documents):
"""Analyze a collection of documents"""
# Process each document
for doc in documents:
if 'content' in doc:
# Extract keywords
doc['keywords'] = self.text_analyzer.extract_keywords(doc['content'], top_n=8)
# Generate summary
if len(doc.get('content', '')) > 500:
doc['ai_summary'] = self.text_analyzer.summarize_text(doc['content'], max_length=200)
# Extract topics across all documents
topics = self.topic_modeler.extract_topics(documents)
# Generate research insights
insights = self.generate_research_insights(documents, topics)
return {
'documents': documents,
'topics': topics,
'insights': insights
}
def generate_research_insights(self, documents, topics):
"""Generate research insights using OpenAI API"""
if not documents:
return []
try:
# Create a summary of document titles and summaries
doc_summaries = []
for doc in documents[:10]: # Limit to top 10 to stay within token limits
summary = doc.get('ai_summary', doc.get('summary', ''))
if summary:
doc_summaries.append(f"Title: {doc.get('title', 'Untitled')}\nSummary: {summary}")
doc_summary_text = "\n\n".join(doc_summaries)
# Format topics
topic_text = "\n".join([
f"Topic {t['id']}: {', '.join(t['words'])}"
for t in topics
])
prompt = f"""Based on the following research document summaries and topics:
Document Summaries:
{doc_summary_text}
Topics Identified:
{topic_text}
Please provide 3-5 key insights or findings that can be drawn from this research collection.
Focus on identifying patterns, contradictions, research gaps, or notable developments in the field.
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a research analyst specializing in synthesizing information from multiple sources."},
{"role": "user", "content": prompt}
],
max_tokens=500,
temperature=0.4
)
# Parse insights from response
insight_text = response.choices[0].message['content'].strip()
insights = [line.strip() for line in insight_text.split("\n") if line.strip()]
return insights
except Exception as e:
print(f"Error generating research insights: {e}")
return ["Unable to generate insights due to an error."]
Part 5: Building the Storage System
Step 1: Create Document Database
Create storage/document_store.py
:
import json
import os
from datetime import datetime
import chromadb
from chromadb.config import Settings
import hashlib
class DocumentStore:
def __init__(self, data_dir="./research_data"):
self.data_dir = data_dir
# Ensure directory exists
os.makedirs(data_dir, exist_ok=True)
# Initialize ChromaDB for vector search
self.chroma_client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=f"{data_dir}/chroma"
))
# Create or get collection
self.collection = self.chroma_client.get_or_create_collection("research_documents")
def save_research_results(self, query, results, timestamp=None):
"""Save research results to disk"""
if timestamp is None:
timestamp = datetime.now().isoformat()
research_id = hashlib.md5(f"{query}_{timestamp}".encode()).hexdigest()
# Prepare documents for vector storage
docs = []
metadatas = []
ids = []
for idx, doc in enumerate(results['documents']):
doc_id = f"{research_id}_{idx}"
ids.append(doc_id)
# Store content for embedding
content = doc.get('content', doc.get('summary', ''))
if not content:
content = f"{doc.get('title', '')} {doc.get('ai_summary', '')}"
docs.append(content)
# Store metadata
metadata = {
'title': doc.get('title', 'Untitled'),
'url': doc.get('url', ''),
'source_type': doc.get('source_type', 'unknown'),
'query': query,
'research_id': research_id
}
metadatas.append(metadata)
# Add to vector database
if docs:
self.collection.add(
documents=docs,
metadatas=metadatas,
ids=ids
)
# Save complete results to file
filename = f"{self.data_dir}/{research_id}.json"
with open(filename, 'w') as f:
json.dump({
'query': query,
'timestamp': timestamp,
'research_id': research_id,
'results': results
}, f, indent=2)
return research_id
def load_research(self, research_id):
"""Load research results by ID"""
filename = f"{self.data_dir}/{research_id}.json"
if not os.path.exists(filename):
return None
with open(filename, 'r') as f:
return json.load(f)
def search_documents(self, query, limit=10):
"""Search for documents relevant to query"""
results = self.collection.query(
query_texts=[query],
n_results=limit
)
found_documents = []
for i, doc_id in enumerate(results['ids'][0]):
# Extract research_id from document ID
research_id = doc_id.split('_')[0]
# Load research
research = self.load_research(research_id)
if not research:
continue
# Find the specific document
doc_idx = int(doc_id.split('_')[1])
if 0 <= doc_idx < len(research['results']['documents']):
document = research['results']['documents'][doc_idx]
found_documents.append({
'document': document,
'research_id': research_id,
'research_query': research['query'],
'research_timestamp': research['timestamp']
})
return found_documents
def list_researches(self, limit=20):
"""List recent researches"""
researches = []
# Get all JSON files in data directory
files = [f for f in os.listdir(self.data_dir) if f.endswith('.json')]
# Sort by modification time (newest first)
files.sort(key=lambda f: os.path.getmtime(os.path.join(self.data_dir, f)), reverse=True)
# Load research metadata
for filename in files[:limit]:
research_id = filename.split('.')[0]
research = self.load_research(research_id)
if research:
researches.append({
'id': research_id,
'query': research['query'],
'timestamp': research['timestamp'],
'document_count': len(research['results']['documents']),
'topics': research['results']['topics']
})
return researches
Part 6: Building the User Interface
Step 1: Create API Server
Create ui/api.py
:
from flask import Flask, request, jsonify
from ..data_collection.collector import DataCollector
from ..processing.pipeline import ProcessingPipeline
from ..analysis.research_analyzer import ResearchAnalyzer
from ..storage.document_store import DocumentStore
import threading
import queue
app = Flask(__name__)
# Initialize components
data_collector = DataCollector()
processing_pipeline = ProcessingPipeline()
research_analyzer = ResearchAnalyzer()
document_store = DocumentStore()
# Task queue for background processing
task_queue = queue.Queue()
results = {}
def worker():
"""Background worker to process research tasks"""
while True:
try:
task_id, query, sources, limit = task_queue.get()
# Update status
results[task_id] = {'status': 'collecting', 'progress': 10}
# Collect data
documents = data_collector.collect(query, sources, limit)
results[task_id] = {'status': 'processing', 'progress': 30}
# Process documents
processed_docs = processing_pipeline.process_batch(documents)
results[task_id] = {'status': 'analyzing', 'progress': 60}
# Analyze research
analysis_results = research_analyzer.analyze(processed_docs)
results[task_id] = {'status': 'storing', 'progress': 90}
# Store results
research_id = document_store.save_research_results(query, analysis_results)
# Complete
results[task_id] = {
'status': 'complete',
'progress': 100,
'research_id': research_id
}
# Remove completed task after 1 hour
def cleanup():
if task_id in results:
del results[task_id]
threading.Timer(3600, cleanup).start()
except Exception as e:
print(f"Worker error: {e}")
results[task_id] = {'status': 'error', 'error': str(e)}
finally:
task_queue.task_done()
# Start worker thread
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()
@app.route('/api/research', methods=['POST'])
def start_research():
"""Start a new research task"""
data = request.json
query = data.get('query')
sources = data.get('sources', None)
limit = data.get('limit', 10)
if not query:
return jsonify({'error': 'Query is required'}), 400
# Generate task ID
task_id = f"task_{len(results) + 1}"
# Initialize task status
results[task_id] = {'status': 'queued', 'progress': 0}
# Add task to queue
task_queue.put((task_id, query, sources, limit))
return jsonify({'task_id': task_id})
@app.route('/api/research/status/<task_id>', methods=['GET'])
def get_research_status(task_id):
"""Get the status of a research task"""
if task_id not in results:
return jsonify({'error': 'Task not found'}), 404
return jsonify(results[task_id])
@app.route('/api/research/<research_id>', methods=['GET'])
def get_research(research_id):
"""Get research results by ID"""
research = document_store.load_research(research_id)
if not research:
return jsonify({'error': 'Research not found'}), 404
return jsonify(research)
@app.route('/api/researches', methods=['GET'])
def list_researches():
"""List recent researches"""
limit = request.args.get('limit', 20, type=int)
researches = document_store.list_researches(limit)
return jsonify(researches)
@app.route('/api/search', methods=['GET'])
def search_documents():
"""Search for documents"""
query = request.args.get('q')
limit = request.args.get('limit', 10, type=int)
if not query:
return jsonify({'error': 'Query parameter q is required'}), 400
documents = document_store.search_documents(query, limit)
return jsonify(documents)
def start_server(host='0.0.0.0', port=5000, debug=False):
"""Start the API server"""
app.run(host=host, port=port, debug=debug)
Step 2: Create Command Line Interface
Create ui/cli.py
:
import argparse
import requests
import time
import json
import os
from tabulate import tabulate
from datetime import datetime
class ResearchCLI:
def __init__(self, api_base="http://localhost:5000/api"):
self.api_base = api_base
def start_research(self, query, sources=None, limit=10):
"""Start a new research task"""
data = {
'query': query,
'limit': limit
}
if sources:
data['sources'] = sources.split(',')
response = requests.post(f"{self.api_base}/research", json=data)
if response.status_code != 200:
print(f"Error: {response.json().get('error', 'Unknown error')}")
return
task_id = response.json()['task_id']
print(f"Started research task: {task_id}")
print("Monitoring progress...")
while True:
status_resp = requests.get(f"{self.api_base}/research/status/{task_id}")
if status_resp.status_code != 200:
print(f"Error checking status: {status_resp.text}")
break
status = status_resp.json()
if status['status'] == 'error':
print(f"Research failed: {status.get('error', 'Unknown error')}")
break
print(f"Status: {status['status']} ({status['progress']}%)")
if status['status'] == 'complete':
print(f"Research complete! ID: {status['research_id']}")
self.show_research(status['research_id'])
break
time.sleep(2)
def list_researches(self, limit=10):
"""List recent researches"""
response = requests.get(f"{self.api_base}/researches?limit={limit}")
if response.status_code != 200:
print(f"Error: {response.json().get('error', 'Unknown error')}")
return
researches = response.json()
if not researches:
print("No researches found.")
return
table_data = []
for research in researches:
table_data.append([
research['id'][:8], # Truncated ID
research['query'],
research['timestamp'][:10], # Just the date
research['document_count']
])
print(tabulate(
table_data,
headers=["ID", "Query", "Date", "Docs"],
tablefmt="grid"
))
def show_research(self, research_id):
"""Display research results"""
response = requests.get(f"{self.api_base}/research/{research_id}")
if response.status_code != 200:
print(f"Error: {response.json().get('error', 'Unknown error')}")
return
research = response.json()
print("\n" + "="*80)
print(f"Research: {research['query']}")
print(f"Date: {research['timestamp']}")
print(f"ID: {research['research_id']}")
print("="*80)
# Show insights
insights = research['results'].get('insights', [])
if insights:
print("\nKEY INSIGHTS:")
for i, insight in enumerate(insights, 1):
print(f"{i}. {insight}")
Show topics
topics = research['results'].get('topics', [])
if topics:
print("\nDISCOVERED TOPICS:")
for topic in topics:
print(f"Topic {topic['id']}: {', '.join(topic['words'])}")
# Show documents
documents = research['results'].get('documents', [])
if documents:
print(f"\nDOCUMENTS ({len(documents)}):")
for i, doc in enumerate(documents[:10], 1): # Show only first 10
print(f"\n{i}. {doc.get('title', 'Untitled')}")
print(f" Source: {doc.get('source_type', 'unknown')} | URL: {doc.get('url', 'N/A')}")
if 'ai_summary' in doc:
print(f" Summary: {doc['ai_summary'][:150]}...")
elif 'summary' in doc:
print(f" Summary: {doc['summary'][:150]}...")
if len(documents) > 10:
print(f"\n... and {len(documents) - 10} more documents")
def search(self, query, limit=10):
"""Search for documents"""
response = requests.get(f"{self.api_base}/search?q={query}&limit={limit}")
if response.status_code != 200:
print(f"Error: {response.json().get('error', 'Unknown error')}")
return
documents = response.json()
if not documents:
print("No matching documents found.")
return
print(f"Found {len(documents)} matching documents:")
for i, result in enumerate(documents, 1):
doc = result['document']
print(f"\n{i}. {doc.get('title', 'Untitled')}")
print(f" From research: {result['research_query']} ({result['research_timestamp'][:10]})")
print(f" Source: {doc.get('source_type', 'unknown')} | URL: {doc.get('url', 'N/A')}")
if 'ai_summary' in doc:
print(f" Summary: {doc['ai_summary'][:150]}...")
elif 'summary' in doc:
print(f" Summary: {doc['summary'][:150]}...")
def main(): parser = argparse.ArgumentParser(description=”AI Research Machine CLI”) subparsers = parser.add_subparsers(dest=”command”)
# Start research command
start_parser = subparsers.add_parser("start", help="Start a new research task")
start_parser.add_argument("query", help="Research query")
start_parser.add_argument("--sources", help="Comma-separated list of sources (arxiv,news,web)")
start_parser.add_argument("--limit", type=int, default=10, help="Number of results per source")
# List researches command
list_parser = subparsers.add_parser("list", help="List recent researches")
list_parser.add_argument("--limit", type=int, default=10, help="Number of researches to list")
# Show research command
show_parser = subparsers.add_parser("show", help="Show research results")
show_parser.add_argument("id", help="Research ID")
# Search command
search_parser = subparsers.add_parser("search", help="Search for documents")
search_parser.add_argument("query", help="Search query")
search_parser.add_argument("--limit", type=int, default=10, help="Number of results")
args = parser.parse_args()
cli = ResearchCLI()
if args.command == "start":
cli.start_research(args.query, args.sources, args.limit)
elif args.command == "list":
cli.list_researches(args.limit)
elif args.command == "show":
cli.show_research(args.id)
elif args.command == "search":
cli.search(args.query, args.limit)
else:
parser.print_help()
if name == “main“: main()
## Part 7: Scheduling and Automation
### Step 1: Create Scheduler
Create `ai_research_machine/scheduler.py`:
```python
import schedule
import time
import threading
import os
import json
from .data_collection.collector import DataCollector
from .processing.pipeline import ProcessingPipeline
from .analysis.research_analyzer import ResearchAnalyzer
from .storage.document_store import DocumentStore
class ResearchScheduler:
def __init__(self):
self.data_collector = DataCollector()
self.processing_pipeline = ProcessingPipeline()
self.research_analyzer = ResearchAnalyzer()
self.document_store = DocumentStore()
self.schedule_file = "scheduled_research.json"
self.running = False
self.schedule_thread = None
def load_schedules(self):
"""Load scheduled research from file"""
if not os.path.exists(self.schedule_file):
return []
with open(self.schedule_file, 'r') as f:
return json.load(f)
def save_schedules(self, schedules):
"""Save scheduled research to file"""
with open(self.schedule_file, 'w') as f:
json.dump(schedules, f, indent=2)
def add_schedule(self, query, frequency, sources=None, limit=10):
"""Add a new scheduled research task"""
schedules = self.load_schedules()
# Create new schedule
new_schedule = {
'id': len(schedules) + 1,
'query': query,
'frequency': frequency,
'sources': sources,
'limit': limit,
'enabled': True
}
schedules.append(new_schedule)
self.save_schedules(schedules)
# Apply schedule
self._apply_schedule(new_schedule)
return new_schedule['id']
def run_scheduled_research(self, schedule):
"""Run a scheduled research task"""
print(f"Running scheduled research: {schedule['query']}")
# Collect data
documents = self.data_collector.collect(
schedule['query'],
schedule['sources'],
schedule['limit']
)
# Process documents
processed_docs = self.processing_pipeline.process_batch(documents)
# Analyze research
analysis_results = self.research_analyzer.analyze(processed_docs)
# Store results
research_id = self.document_store.save_research_results(
schedule['query'],
analysis_results
)
print(f"Completed scheduled research: {schedule['query']} (ID: {research_id})")
def _apply_schedule(self, schedule):
"""Apply a schedule using the schedule library"""
if not schedule['enabled']:
return
frequency = schedule['frequency']
query = schedule['query']
if frequency == 'daily':
schedule.every().day.at("09:00").do(
self.run_scheduled_research, schedule
).tag(f"research_{schedule['id']}")
elif frequency == 'weekly':
schedule.every().week.do(
self.run_scheduled_research, schedule
).tag(f"research_{schedule['id']}")
elif frequency == 'monthly':
schedule.every(30).days.do(
self.run_scheduled_research, schedule
).tag(f"research_{schedule['id']}")
def update_schedule(self, schedule_id, enabled=None, frequency=None):
"""Update an existing schedule"""
schedules = self.load_schedules()
for schedule in schedules:
if schedule['id'] == schedule_id:
if enabled is not None:
schedule['enabled'] = enabled
if frequency is not None:
schedule['frequency'] = frequency
# Clear existing schedule
schedule.clear_jobs(f"research_{schedule_id}")
# Apply updated schedule
self._apply_schedule(schedule)
break
self.save_schedules(schedules)
def start(self):
"""Start the scheduler thread"""
if self.running:
return
# Load and apply all schedules
schedules = self.load_schedules()
for schedule in schedules:
self._apply_schedule(schedule)
# Start scheduler thread
def run_scheduler():
self.running = True
while self.running:
schedule.run_pending()
time.sleep(60)
self.schedule_thread = threading.Thread(target=run_scheduler)
self.schedule_thread.daemon = True
self.schedule_thread.start()
def stop(self):
"""Stop the scheduler thread"""
self.running = False
if self.schedule_thread:
self.schedule_thread.join(timeout=5)
self.schedule_thread = None
Step 2: Add Scheduler to API
Add to ui/api.py
:
from ..scheduler import ResearchScheduler
# Initialize scheduler
scheduler = ResearchScheduler()
scheduler.start()
# Add endpoints
@app.route('/api/schedules', methods=['GET'])
def list_schedules():
"""List scheduled research tasks"""
schedules = scheduler.load_schedules()
return jsonify(schedules)
@app.route('/api/schedules', methods=['POST'])
def add_schedule():
"""Add a new scheduled research task"""
data = request.json
query = data.get('query')
frequency = data.get('frequency', 'daily')
sources = data.get('sources')
limit = data.get('limit', 10)
if not query:
return jsonify({'error': 'Query is required'}), 400
schedule_id = scheduler.add_schedule(query, frequency, sources, limit)
return jsonify({'id': schedule_id})
@app.route('/api/schedules/<int:schedule_id>', methods=['PUT'])
def update_schedule(schedule_id):
"""Update a scheduled research task"""
data = request.json
enabled = data.get('enabled')
frequency = data.get('frequency')
scheduler.update_schedule(schedule_id, enabled, frequency)
return jsonify({'success': True})
Part 8: Main Application Entry Point
Create ai_research_machine/__main__.py
:
import argparse
from .ui.api import start_server
from .ui.cli import main as cli_main
def main():
parser = argparse.ArgumentParser(description='AI Research Machine')
subparsers = parser.add_subparsers(dest='command')
# API server command
server_parser = subparsers.add_parser('server', help='Start the API server')
server_parser.add_argument('--host', default='0.0.0.0', help='Server host')
server_parser.add_argument('--port', type=int, default=5000, help='Server port')
server_parser.add_argument('--debug', action='store_true', help='Enable debug mode')
# CLI command
cli_parser = subparsers.add_parser('cli', help='Run the command line interface')
args, remaining = parser.parse_known_args()
if args.command == 'server':
print(f"Starting AI Research Machine server on {args.host}:{args.port}")
start_server(args.host, args.port, args.debug)
elif args.command == 'cli':
# Pass remaining args to CLI
import sys
sys.argv = [sys.argv[0]] + remaining
cli_main()
else:
parser.print_help()
if __name__ == '__main__':
main()
Part 9: Final Integration and Deployment
Step 1: Create Setup File
Create setup.py
in the project root:
from setuptools import setup, find_packages
setup(
name="ai_research_machine",
version="0.1.0",
packages=find_packages(),
install_requires=[
"requests",
"beautifulsoup4",
"nltk",
"scikit-learn",
"pandas",
"numpy",
"openai",
"langchain",
"chromadb",
"pdfplumber",
"arxiv",
"pymupdf",
"flask",
"python-dotenv",
"schedule",
"tabulate"
],
entry_points={
"console_scripts": [
"ai_research=ai_research_machine.__main__:main",
],
},
)
Step 2: Create Docker File
Create Dockerfile
in the project root:
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Install application
RUN pip install -e .
# Expose API port
EXPOSE 5000
# Run server
ENTRYPOINT ["ai_research", "server"]
Step 3: Create Requirements File
Create requirements.txt
in the project root:
requests>=2.28.0
beautifulsoup4>=4.11.0
nltk>=3.7
scikit-learn>=1.1.0
pandas>=1.4.0
numpy>=1.22.0
openai>=0.27.0
langchain>=0.0.234
chromadb>=0.4.0
pdfplumber>=0.7.0
arxiv>=1.4.0
pymupdf>=1.20.0
flask>=2.2.0
python-dotenv>=0.21.0
schedule>=1.1.0
tabulate>=0.9.0
Using Your AI Research Machine
Now that you’ve built your AI research machine, here’s how to use it:
Installation
# Clone the repository (if using Git)
git clone https://github.com/yourusername/ai-research-machine.git
cd ai-research-machine
# Install the package
pip install -e .
Starting the API Server
# Start the server
ai_research server --port 5000
Using the CLI
# Start a new research
ai_research cli start "quantum computing recent advances" --sources arxiv,news
# List recent researches
ai_research cli list
# Show research results
ai_research cli show <research_id>
# Search across all researches
ai_research cli search "neural networks"
Scheduling Regular Research
# Using the API
curl -X POST http://localhost:5000/api/schedules \
-H "Content-Type: application/json" \
-d '{"query": "artificial intelligence ethics", "frequency": "weekly"}'
Extending Your Research Machine
Here are some ways to extend your AI research machine:
- Add more data sources: Implement connectors for academic databases, RSS feeds, or social media
- Improve analysis: Integrate more advanced NLP techniques for entity recognition or sentiment analysis
- Create a web UI: Build a frontend using React or Vue.js to visualize research results
- Add citation extraction: Extract and manage citations from academic papers
- Implement collaborative features: Allow multiple users to share and annotate research
- Add notification system: Set up email or messaging notifications for new research findings
Conclusion
You now have a powerful AI-driven research machine that can automatically gather, process, and analyze information on any topic of interest. This system saves countless hours of manual research while providing valuable insights and organizing information effectively.
The modular architecture allows for easy extension and customization to suit your specific research needs. Whether you’re a student, academic researcher, journalist, or professional staying on top of industry trends, this tool will help you manage information overload and focus on what matters most – understanding and applying the knowledge you gather.
Happy researching!