from google.analytics.data_v1beta import BetaAnalyticsDataClient
from facebook_business.api import FacebookAdsApi
import sqlite3
from datetime import datetime, timedelta
class MarketingPipeline:
def __init__(self):
self.ga_client = BetaAnalyticsDataClient()
self.fb_api = FacebookAdsApi.init(access_token=”your_token”)
self.db_connection = sqlite3.join(‘marketing_analytics.db’)
def extract_google_analytics(self):
“””Get web site visitors and conversion knowledge”””
# That is simplified – actual GA4 API calls are extra complicated
question = {
‘property’: ‘properties/your-property-id’,
‘dimensions’: [‘date’, ‘source’, ‘medium’],
‘metrics’: [‘sessions’, ‘conversions’, ‘revenue’],
‘date_ranges’: [{‘start_date’: ’30daysAgo’, ‘end_date’: ‘today’}]
}
response = self.ga_client.run_report(question)
# Convert to DataFrame
ga_data = pd.DataFrame([
{
‘date’: row.dimension_values[0].worth,
‘supply’: row.dimension_values[1].worth,
‘medium’: row.dimension_values[2].worth,
‘periods’: row.metric_values[0].worth,
‘conversions’: row.metric_values[1].worth,
‘income’: row.metric_values[2].worth
}
for row in response.rows
])
return ga_data
def extract_facebook_ads(self):
“””Get Fb marketing campaign efficiency”””
from facebook_business.adobjects.adaccount import AdAccount
ad_account = AdAccount(‘act_your-account-id’)
campaigns = ad_account.get_campaigns(fields=[
‘name’, ‘spend’, ‘impressions’, ‘clicks’, ‘conversions’
])
fb_data = pd.DataFrame([{
‘campaign_name’: campaign[‘name’],
‘spend’: float(marketing campaign[‘spend’]),
‘impressions’: int(marketing campaign[‘impressions’]),
‘clicks’: int(marketing campaign[‘clicks’]),
‘conversions’: int(marketing campaign.get(‘conversions’, 0))
} for marketing campaign in campaigns])
return fb_data
def transform_and_analyze(self, ga_data, fb_data):
“””Calculate ROI and buyer lifetime worth”””
# Clear Google Analytics knowledge
ga_data[‘revenue’] = pd.to_numeric(ga_data[‘revenue’], errors=’coerce’)
ga_data[‘conversions’] = pd.to_numeric(ga_data[‘conversions’], errors=’coerce’)
# Calculate metrics
ga_summary = ga_data.groupby([‘source’, ‘medium’]).agg({
‘periods’: ‘sum’,
‘conversions’: ‘sum’,
‘income’: ‘sum’
}).reset_index()
ga_summary[‘conversion_rate’] = ga_summary[‘conversions’] / ga_summary[‘sessions’]
ga_summary[‘revenue_per_session’] = ga_summary[‘revenue’] / ga_summary[‘sessions’]
# Calculate Fb ROI
fb_data[‘roi’] = (fb_data[‘conversions’] * 50 – fb_data[‘spend’]) / fb_data[‘spend’] # Assuming $50 common order worth
fb_data[‘cost_per_conversion’] = fb_data[‘spend’] / fb_data[‘conversions’].change(0, 1)
return ga_summary, fb_data
def load_to_dashboard(self, ga_summary, fb_data):
“””Save outcomes and set off dashboard replace”””
# Save to database
ga_summary.to_sql(‘ga_performance’, self.db_connection, if_exists=’change’)
fb_data.to_sql(‘fb_performance’, self.db_connection, if_exists=’change’)
# Create abstract report
report = {
‘date’: datetime.now().strftime(‘%Y-%m-%d’),
‘top_ga_source’: ga_summary.loc[ga_summary[‘revenue’].idxmax(), ‘supply’],
‘best_fb_campaign’: fb_data.loc[fb_data[‘roi’].idxmax(), ‘campaign_name’],
‘total_revenue’: ga_summary[‘revenue’].sum(),
‘total_ad_spend’: fb_data[‘spend’].sum()
}
# This might set off e mail alerts, Slack notifications, and so on.
print(f”Pipeline accomplished: Generated ${report[‘total_revenue’]:.2f} income from ${report[‘total_ad_spend’]:.2f} advert spend”)
return report
# Run the pipeline
if __name__ == “__main__”:
pipeline = MarketingPipeline()
# Extract knowledge
ga_data = pipeline.extract_google_analytics()
fb_data = pipeline.extract_facebook_ads()
# Remodel knowledge
ga_summary, fb_summary = pipeline.transform_and_analyze(ga_data, fb_data)
# Load outcomes
report = pipeline.load_to_dashboard(ga_summary, fb_summary)