Close Menu
    Trending
    • Apple Is Developing AI Smart Glasses to Take on Meta, Google
    • Acciceptron: An AI-Driven Cognitive State Monitoring System for Automotive Safety Using Multimodal Neuro-HCI Integration | by Karthikeya Redrowtu | May, 2025
    • Duolingo CEO Clarifies AI Stance After Backlash: Read Memo
    • Demystifying AI: Understanding What Lies Beyond Machine Learning | by Chandra Prakash Tekwani | May, 2025
    • This CEO Says the Secret to Growth Is Knowing Who You’re Not For
    • Unlock AI/ML Essential Ideas to Help You Take Advantage of AI | by linhvuquach | May, 2025
    • How Saying ‘Yes’ to Everything Can Stall Your Growth
    • Advanced Rag Techniques- Elevating LLM Interactions with Intelligent Routing | by Guarav Bansal | May, 2025
    Finance StarGate
    • Home
    • Artificial Intelligence
    • AI Technology
    • Data Science
    • Machine Learning
    • Finance
    • Passive Income
    Finance StarGate
    Home»Machine Learning»Amplifying Creativity: Building an AI-Powered Content Creation Assistant — Part 3 | by Markell Richards | Apr, 2025
    Machine Learning

    Amplifying Creativity: Building an AI-Powered Content Creation Assistant — Part 3 | by Markell Richards | Apr, 2025

    FinanceStarGateBy FinanceStarGateApril 7, 2025No Comments16 Mins Read
    Share Facebook Twitter Pinterest LinkedIn Tumblr Reddit Telegram Email
    Share
    Facebook Twitter LinkedIn Pinterest Email


    Photograph by israel palacio / Unsplash

    Welcome to half 3 of this sequence. Within the earlier put up, we tailored our Jupyter Pocket book implementation from half 1 right into a FastAPI server that may serve customers. We discovered easy methods to arrange FastAPI and configure settings, in addition to easy methods to set off workflows from REST endpoints.

    Beforehand, we recognized some limitations in our resolution. Customers must look ahead to the response after the workflow is accomplished. We have now additionally been saving our content material in a neighborhood folder. Whereas that is okay for fast experimenting, we are going to increase this resolution to ship progress occasions and use an applicable storage mechanism on this put up.

    Along with enhancing our resolution, we are going to proceed to summary our code into succinct modules and pin our dependencies to make sure construct reproducibility.

    • Stream progress occasions to customers all through a workflow
    • The right way to use SQLAlchemy ORM to work together with Postgres
    • Working migrations with Alembic
    • Saving photos to Minio
    • Run a number of Docker functions along with Docker-Compose
    • Configure the WebSocket endpoint within the FastAPI software
    ├── README.md
    ├── alembic
    │ ├── README
    │ ├── env.py
    │ ├── script.py.mako
    │ └── variations
    ├── alembic.ini
    ├── docker-compose.yml
    ├── infrastructure
    │ └── init-db.sh
    ├── pyproject.toml
    ├── necessities
    │ ├── requirements-dev.txt
    │ └── necessities.txt
    └── src
    ├── content material
    │ ├── api.py
    │ ├── fashions
    │ ├── prompts
    │ ├── repositories
    │ ├── schemas
    │ ├── providers
    │ └── workflows
    ├── core
    │ ├── api.py
    │ ├── database.py
    │ ├── logs.py
    │ └── middleware.py
    ├── essential.py
    └── settings
    └── config.py

    In comparison with the earlier put up, the general construction of our tasks has expanded. We have now a couple of top-level directories:

    • Alembic — We are going to talk about this extra within the SQLAlchemy part beneath, however this listing incorporates our database migrations. A migration basically modifications the construction of the database. These information are auto-generated, and we solely want to change env.py and alembic.ini.
    • Alembic.ini is an auto-generated associated config file. We solely have to take away one line from it, as many of the configuration is about up in env.py
    • docker-compose.yml incorporates configuration to run Postgres and Minio in containers on the identical community with provisioned storage utilizing volumes
    • Infrastructure: Our database fashions use UUIDs for GUIDS. This listing incorporates a script to allow the uuid.ossp extension the primary time Postgres runs
    • Pyproject.toml is a configuration file for packaging-related instruments.
    • Necessities: This holds our manufacturing and growth dependencies pinned by model. The organising dependencies part offers extra on this.

    Supply Listing

    The supply listing incorporates the majority of the supply code. The code is structured round domains consisting of core and content material. The core area consists of information that impression the server holistically (middleware) or include logic for use in different domains, comparable to database classes, logging perform, and a well being test for later k8s deployment. The content material area makes use of DDD tactical patterns: Repository and Providers to cleanly summary code. Every area (subsequent up will likely be customers) will comply with the identical normal construction:

    • Fashions: SQLAlchemy fashions that outline the database schema.
    • Schema: Pydantic fashions which deal with enter and output information validation and serialization.
    • Repositories: The repositories are accountable for information entry. Separating information entry logic, making certain simpler testing and upkeep.
    • Providers: coordinates enterprise logic, interfacing between software logic and information entry logic (repository)

    Content material area particularly has:

    • Prompts: a central place to retailer prompts. For now, different instruments later within the sequence will deal with immediate versioning and a extra strong strategy.
    • Workflows: shops all workflows.

    Partially 2, we arrange our digital setting utilizing Conda and put in all our dependencies by the command line. This works initially, however the underlying dependencies can change, breaking the supply code. On this challenge, we introduce pyproject.toml, which has grow to be the de facto commonplace for configuring Python functions.

    [build-system]
    requires = ["setuptools", "wheel"]
    build-backend = "setuptools.build_meta"

    pyt[project]
    identify = "content_assistant"
    authors = [{ name = "Markell Richards" }]
    model = "0.0.1"
    requires-python = ">=3.12"
    readme = "README.md"
    dependencies = [
    "fastapi",
    "uvicorn[standard]",
    "tenacity",
    "asyncio",
    "llama-index-core",
    "llama-index-llms-openai",
    "tavily-python",
    "openai",
    "pydantic-settings",
    "minio",
    "JSON-log-formatter",
    "sqlalchemy",
    "alembic",
    "psycopg2-binary",
    "boto3",
    "asyncpg",
    ]
    [project.optional-dependencies]
    dev = ["ruff", "black"]

    With our software metadata and dependencies outlined, we have to create a necessities.txt for pip to put in. We are going to use pip-tools which encompass pip-compile and pip-sync. pip-compile pins the dependencies so pip is aware of easy methods to resolve them and which variations to put in. Nice dependency administration revolves round dependency resolving and dependency locking. These two ideas enable us to have deterministic builds. Making certain the app is constructed the identical no matter the place and when.

    To generate the hashes and lock variations, we use the next instructions:

    pip-compile --generate-hashes -o necessities/necessities.txt pyproject.toml  
    pip-compile --extra dev -o necessities/requirements-dev.txt pyproject.toml

    We use requirements-dev for native growth as it’s going to embody further dependencies outlined within the dev part of our pyproject.toml. These aren’t wanted for the manufacturing construct and are excluded from necessities.txt.

    To put in the dependencies, we use:

    pip-sync necessities/necessities.txt necessities/requirements-dev.txt

    pip-sync installs dependencies primarily based on the output of our pip-compile.

    Every area has its personal api.py with a set of routes. These are outlined as routers and are included in our essential.py, as proven right here:

    app = FastAPI(model=model)
    app.include_router(core_router)
    app.include_router(content_router)

    Partially 6 of this sequence, we are going to deploy our full-stack software to Kubernetes. Sometimes, the containers in Kubernetes pods ought to have a solution to test the well being of an software. Extra on this will likely be coated later, however for now, we outlined a easy well being test endpoint that returns the model of our API (model is laid out in pyproject.toml):

    from fastapi import APIRouter, standing
    from fastapi.requests import Request
    router = APIRouter(tags=["Core Endpoints"])
    @router.get("/well being", status_code=standing.HTTP_200_OK)
    async def healthcheck(request: Request) -> dict:
    return {"model": request.app.model}

    We set up a WebSocket connection to supply customers with real-time updates as a workflow proceeds by its steps.

    💡 It is a minimal and easy instance of streaming updates utilizing a WebSocket connection. It can must be improved earlier than being utilized in manufacturing.

    router = APIRouter(tags=["Content Endpoints"])
    @router.websocket("/content material")
    async def advancedContentFlow(websocket: WebSocket, db: Session = Relies upon(get_db), settings: Settings = Relies upon(get_settings)):
    await websocket.settle for()

    s3_client = boto3.consumer(
    's3',
    endpoint_url=settings.MINIO_ENDPOINT,
    aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
    aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
    region_name='us-east-1',
    # Disable SSL verification if utilizing HTTP
    config=boto3.session.Config(signature_version='s3v4')
    )

    workflow_repository = WorkflowRepository(db)
    blog_repository = BlogRepository(db)
    social_media_repository = SocialMediaRepository(db)
    image_repository = ImageRepository(db)

    workfow_service = WorkflowService(workflow_repository)
    blog_service = BlogService(blog_repository)
    social_media_service = SocialMediaService(social_media_repository)
    image_service = ImageService(image_repository=image_repository, s3_client=s3_client)

    workflow = AdvancedWorkflow(settings, db, workfow_service=workfow_service, blog_service=blog_service,
    social_media_service=social_media_service, image_service=image_service)
    attempt:
    information = await websocket.receive_json()
    logger.data(information)
    handler: WorkflowHandler = workflow.run(subject=information["topic"], analysis=information["research"])

    async for occasion in handler.stream_events():
    if isinstance(occasion, ProgressEvent):
    await websocket.send_json({
    "sort": "progress_event",
    "payload": str(occasion.msg)
    })

    end result = await handler
    await websocket.send_json({
    "sort": "outcomes",
    "payload": end result
    })
    besides Exception as e:
    await db.rollback()
    await websocket.send_json({
    "sort": "error",
    "payload": "One thing went flawed"
    })
    logger.error(e)
    lastly:
    await websocket.shut()

    Stepping by the code:

    1. Utilizing dependency injection, we inject a database session and settings object into every connection.
    2. Settle for an incoming WebSocket connection.
    3. Minio is an S3 Suitable object storage. We use the AWS boto SDK to work together with Minio. We create an s3_client to cross into our image_service which incorporates the logic to add photos.
    4. We create situations of every repository and repair sort: workflow, weblog, social media, and picture.
    5. We create an occasion of AdvanceWorkflow and cross in settings, db session, and each service.
    6. We settle for a JSON payload containing a content material subject and analysis boolean flag.
    7. We run our workflow and pay attention for ProgressEvent. Because the workflow progresses, every step will publish a ProgressEvent with a msg that will likely be despatched to the consumer.
    8. As soon as the workflow finishes, the person will get a end result payload signifying the workflow as accomplished or failed.
    9. Error dealing with within the occasion one thing goes flawed
    10. Lastly, we shut the WebSocket connection.

    As talked about, we added Postgres and Minio to the challenge to persist in storing entities created all through the workflows for later retrieval. Above is an easy entity relationship diagram (ERD) of how the tables map collectively. That is topic to alter, however at this level, this construction provides us some normal entry patterns:

    • Every weblog and social media put up belongs to at least one workflow. If we fetch a workflow, we are able to seize all associated entities.
    • In a single workflow, weblog and social media posts share the identical picture generated. Nonetheless, later, the person can change the media used by the interface.
    • A person can edit the content material of weblog and social media put up in later enhancements.

    💡 There’s a scenario that arises the place photos will likely be orphaned. How would you do the cleanup course of? (will likely be revealed later within the sequence)

    To facilitate the interplay with the database and administration of its construction, we use SqlAlchemy and Alembic. SQLAlchemy is an object-relational mapper (ORM) that helps you work together with databases utilizing Python. It affords varied patterns that make managing information simpler and extra environment friendly. SQLAlchemy is designed for prime efficiency, permitting fast entry to database data utilizing easy and clear Python.

    Alembic is a light-weight database migration device constructed for use with SqlAlchemy. It performs “migrations” to alter the construction of the database. By change, this consists of including tables, updating fashions, and so forth.

    Every SqlAlchemy mannequin follows the identical sample so I’ll present only one instance. See the supply code for the remainder of the entities.

    import uuid
    from src.core.database import Base
    from sqlalchemy.dialects.postgresql import UUID
    from sqlalchemy import varieties, ForeignKey, func, DateTime, Column
    from sqlalchemy.orm import relationship

    class BlogPosts(Base):
    __tablename__ = 'blog_posts'

    id = Column(varieties.INTEGER, primary_key=True, autoincrement=True)
    guid = Column(UUID(as_uuid=True),
    primary_key=False,
    distinctive=True,
    nullable=False,
    server_default=func.uuid_generate_v4(),
    default=uuid.uuid4)
    title = Column(varieties.String)
    content material = Column(varieties.TEXT)
    created_at = Column(DateTime, default=func.now(), nullable=False)
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
    workflow_guid = Column(ForeignKey("workflows.guid"))

    image_guid = Column(ForeignKey("photos.guid"))
    picture = relationship("Pictures")

    Right here, we outline our blog_posts desk, which is proven within the ERD. We outline every column and its related sort. Postgres’ built-in perform uuid_generate_v4() generates a singular identifier for the guid column. func.now generates the timestamp for created_at and updated_at columns. Then outline our workflow and picture relationships utilizing ForeignKey. Lastly, we use the relationship module to permit easy accessibility to photographs associated to a weblog put up utilizing the ORM.

    To make use of the uuid_generate_v4() perform with Postgres, we should make sure the extension is enabled in our database.

    set -e
    psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
    EOSQL

    This script is handed into our docker occasion working Postgres and can allow the extension.

    Now that our mannequin is outlined and extension is configured, lets create a migration.

    Within the root listing, you’ll run the next command:

    alembic init alembic

    It will generate the next information:

    • /variations listing: incorporates every migration generated
    • env.py: used to configure alembic
    • README.md: generic data from alembic
    • script.py.mako
    • alembic.ini

    After these information are generated, we have to modify alembic.ini and env.py. In alembic.ini, take away the next line:

    sqlalchemy.url =

    Subsequent, we modify a couple of strains in env.py:

    ## import crucial supply code
    from src.core.database import Base
    from src.settings.config import Settings
    from src.content material.fashions.workflows import Workflow
    from src.content material.fashions.blog_posts import BlogPosts
    from src.content material.fashions.social_media_post import SocialMediaPosts
    from src.content material.fashions.photos import Pictures
    # alembic generated code
    target_metadata = Base.metadata
    settings = Settings()
    config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
    # remainder of alembic generated code

    The fashions should be imported into this file for the migrations to run. We additionally import Base from our database.py

    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from src.core.logs import logger
    from src.settings import settings
    Base = declarative_base()
    # for alembic
    sync_engine = create_engine(settings.DATABASE_URL, echo=True)
    async_engine = create_async_engine(settings.ASYNC_DATABASE_URL, echo=True, future=True)

    AsyncSessionFactory = sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
    )
    async def get_db():
    async with AsyncSessionFactory() as session:
    yield session

    Base incorporates the metadata of our fashions, which every of our SqlALchemy fashions inherits. Alembic is synchronous by nature in comparison with FastAPI, which is asynchronous. I outlined two database engines to keep away from overcomplicating the alembic configuration to be async. One synchronous that makes use of a unique URL is utilized by Alembic, and FastAPI makes use of one other async URL. Right here is an instance of their URLs:

    ASYNC_DATABASE_URL=postgresql+asyncpg://postgres-user:postgres-pw@localhost:5432/demo # EXAMPLE
    DATABASE_URL=postgresql://postgres-user:postgres-pw@localhost:5432/demo #EXAMPLE

    With all the things configured, we are able to run the next two instructions to generate a migration and replace our database:

    alembic revision --autogenerate -m "Preliminary migration" 
    alembic improve head # updates database tables & schemas
    from llama_index.core.workflow import Workflow, Occasion, StartEvent, StopEvent, Context, step
    from sqlalchemy.orm import Session
    from src.content material.providers.workflow_service import WorkflowService
    from src.content material.providers.blog_service import BlogService
    from src.content material.providers.social_media_service import SocialMediaService
    from src.content material.providers.image_service import ImageService
    from src.content material.schemas.workflow import WorkflowStatusType, WorkflowCreate, WorkflowUpdate
    from src.content material.schemas.blog_posts import BlogCreate, BlogUpdate
    from src.content material.schemas.social_media_post import SocialMediaCreate, PlatformType, SocialMediaUpdate
    from src.content material.schemas.tavily_search import TavilySearchInput
    from src.content material.schemas.photos import ImageCreate
    from src.content material.prompts.prompts import *
    from src.content material.providers.tavily_search_service import tavily_search
    from llama_index.llms.openai import OpenAI as LlamaOpenAI
    from openai import OpenAI
    from src.core.logs import logger

    class ResearchEvent(Occasion):
    subject: str
    analysis: bool
    class BlogEvent(Occasion):
    subject: str
    analysis: bool
    research_material: str
    class SocialMediaEvent(Occasion):
    weblog: str
    class SocialMediaCompleteEvent(Occasion):
    cross
    class IllustratorEvent(Occasion):
    weblog: str
    class IllustratorCompleteEvent(Occasion):
    url: str
    class ProgressEvent(Occasion):
    msg: str
    class RetryEvent(Occasion):
    cross
    class WorkflowFailedEvent(Occasion):
    error: str

    class AdvancedWorkflow(Workflow):

    def __init__(self, settings, db: Session, workfow_service: WorkflowService, blog_service: BlogService, social_media_service: SocialMediaService, image_service: ImageService, timeout=None, verbose=None):
    tremendous().__init__(timeout, verbose)
    self.settings = settings
    self.db = db
    self.workflow_service = workfow_service
    self.blog_service = blog_service
    self.social_media_service = social_media_service
    self.image_service = image_service

    @step
    async def start_event(self, ev: StartEvent, ctx: Context) -> ResearchEvent | BlogEvent | WorkflowFailedEvent:
    ctx.write_event_to_stream(ProgressEvent(msg="Beginning content material creation workflow"))
    workflow_data = WorkflowCreate(standing=WorkflowStatusType.INPROGRESS)
    attempt:
    workflow = await self.workflow_service.create_workflow(workflow_data=workflow_data)
    await ctx.set(key="workflow_id", worth=workflow.id)
    await ctx.set(key="workflow_guid", worth=workflow.guid)
    if ev.analysis:
    return ResearchEvent(subject=ev.subject, analysis=ev.analysis)
    return BlogEvent(subject=ev.subject, analysis=ev.analysis, research_material="None")
    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")

    @step
    async def research_event(self, ev: ResearchEvent, ctx: Context) -> BlogEvent | WorkflowFailedEvent:
    ctx.write_event_to_stream(ProgressEvent(msg=f"Looking out web for details about {ev.subject}"))
    attempt:
    search_input = TavilySearchInput(
    question=ev.subject,
    max_results=3,
    search_depth="primary"
    )
    research_material = await tavily_search(search_input, api_key=self.settings.TAVILY_SEARCH_API_KEY)
    return BlogEvent(subject=ev.subject, analysis= ev.analysis, research_material=research_material)
    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")

    @step
    async def blog_event(self, ev: BlogEvent, ctx: Context) -> SocialMediaEvent | WorkflowFailedEvent:
    ctx.write_event_to_stream(ProgressEvent(msg="Writing weblog put up"))
    prompt_template = ""
    workflow_guid = await ctx.get("workflow_guid")
    attempt:
    if(ev.analysis):
    prompt_template = BLOG_AND_RESEARCH_TEMPLATE.format(query_str=ev.subject, analysis=ev.research_material)
    else:
    prompt_template = BLOG_TEMPLATE.format(query_str=ev.subject)
    llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
    response = await llm.acomplete(prompt_template)
    blog_data = BlogCreate(title=ev.subject, content material=response.textual content, workflow_guid=workflow_guid)
    blog_post = await self.blog_service.create_blog(blog_data=blog_data)
    await ctx.set(key="blog_id", worth=blog_post.id)
    ctx.send_event(SocialMediaEvent(weblog=blog_data.content material))

    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")

    @step
    async def social_media_event(self, ev: SocialMediaEvent, ctx: Context) -> SocialMediaCompleteEvent | IllustratorEvent | WorkflowFailedEvent:
    ctx.write_event_to_stream(ProgressEvent(msg="Writing social media put up"))
    worklflow_guid = await ctx.get("workflow_guid")
    attempt:
    prompt_template = LINKED_IN_TEMPLATE.format(blog_content=ev.weblog)
    llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
    response = await llm.acomplete(prompt_template)
    sm_data = SocialMediaCreate(content material=response.textual content, platform_type=PlatformType.LINKEDIN, workflow_guid=worklflow_guid)
    sm_post = await self.social_media_service.create_social_media_post(social_media_data=sm_data)
    await ctx.set(key="sm_id", worth=sm_post.id)
    ctx.send_event(IllustratorEvent(weblog=ev.weblog))
    return SocialMediaCompleteEvent()
    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")

    @step
    async def illustration_event(self, ev: IllustratorEvent, ctx: Context) -> IllustratorCompleteEvent | WorkflowFailedEvent:
    ctx.write_event_to_stream(ProgressEvent(msg="Drawing illustration for content material"))
    attempt:
    llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
    image_prompt_instructions_generator = IMAGE_GENERATION_TEMPLATE.format(blog_post=ev.weblog)
    image_prompt = await llm.acomplete(image_prompt_instructions_generator, formatted=True)
    openai_client = OpenAI(api_key=self.settings.OPENAI_API_KEY)
    file_name = await self.image_service.generate_and_upload_image(bucket=self.settings.MINIO_BUCKET_NAME, openai_client=openai_client, image_prompt=image_prompt.textual content)
    url = f"{self.settings.MINIO_ENDPOINT}/{self.settings.MINIO_BUCKET_NAME}/{file_name}"
    image_data = ImageCreate(url=url)
    picture = await self.image_service.create_image(image_data=image_data)
    await ctx.set("image_guid", picture.guid)
    return IllustratorCompleteEvent(url=url)
    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")

    @step
    async def step_workflow_success(self, ev:SocialMediaCompleteEvent | IllustratorCompleteEvent, ctx: Context) -> StopEvent | WorkflowFailedEvent:
    if (
    ctx.collect_events(
    ev,
    [SocialMediaCompleteEvent, IllustratorCompleteEvent]
    ) is None
    ) : return None

    workflow_id = await ctx.get("workflow_id")
    image_guid = await ctx.get("image_guid")
    blog_id = await ctx.get("blog_id")
    sm_id = await ctx.get("sm_id")
    workflow_update_data = WorkflowUpdate(id=workflow_id, standing=WorkflowStatusType.COMPLETE)
    blog_update_data = BlogUpdate(id=blog_id, image_guid=image_guid)
    sm_update_data = SocialMediaUpdate(id=sm_id, image_guid=image_guid)

    attempt:
    await self.workflow_service.update_workflow(workflow_id, workflow_update_data)
    await self.blog_service.update_blog(blog_id=blog_id, blog_data=blog_update_data)
    await self.social_media_service.update_social_media_post(sm_id=sm_id, sm_data=sm_update_data)
    return StopEvent(end result="Achieved")
    besides Exception as e:
    return WorkflowFailedEvent(error=f"{e}")
    @step
    async def step_workflow_failed(self, ev: WorkflowFailedEvent, ctx: Context) -> StopEvent:
    attempt:
    workflow_id = await ctx.get("workflow_id")
    workflow_update_data = WorkflowUpdate(id=workflow_id, standing=WorkflowStatusType.FAILED)
    await self.workflow_service.update_workflow(workflow_id, workflow_update_data)
    return StopEvent(end result="Failed")
    besides:
    logger.error(ev.error)
    return StopEvent(end result="Failed")

    Our workflow has largely stayed the identical, aside from the addition of utilizing providers to deal with logic associated to managing entities in Postgres and Minio. We additionally added a failure step in case something goes flawed. To notice, social media workflow is now calling the illustration step. This avoids the identical session performing two operations concurrently, inflicting a collision. Sooner or later, we are able to cross in a session manufacturing unit to allow parallel processing, however I didn’t for the sake of time.

    In Area-Pushed Design (DDD), tactical patterns are important design pointers that successfully manage and construction code inside a bounded context. We implement the repository and repair sample described above in our code base. The patterns are repeatable, so like earlier than, I present one instance, on this case for Pictures, as they embody further logic for producing and importing photos.

    First, we outline an interface with photos that the ImageRepository should implement.

    from abc import ABC, abstractmethod
    from src.content material.schemas.photos import ImageCreate, Picture

    class IImagesRepository(ABC):

    @abstractmethod
    async def create_image(self, image_data: ImageCreate) -> Picture:
    cross

    Then we outline the ImageRepositoy:

    from src.content material.repositories.photos.images_repository_interface import IImagesRepository
    from src.content material.schemas.photos import Picture, ImageCreate
    from src.content material.fashions.photos import Pictures
    from sqlalchemy.ext.asyncio import AsyncSession

    class ImageRepository(IImagesRepository):
    def __init__(self, async_db_session: AsyncSession):
    self.db = async_db_session

    async def create_image(self, image_data) -> Picture:
    if not isinstance(image_data, ImageCreate):
    elevate ValueError("Anticipated occasion of ImageCreate")

    attempt:
    new_image_data = Pictures(url=image_data.url)

    self.db.add(new_image_data)
    await self.db.commit()
    await self.db.refresh(new_image_data)

    return Picture(
    id=new_image_data.id,
    guid=new_image_data.guid,
    url=new_image_data.url
    )
    besides:
    await self.db.rollback()
    elevate

    The interface outlined one create_image methodology. The ImageRepositoy implements this methodology by checking if the information handed in matches the Pydantic schema ImageCreate. If it’s a legitimate payload, it creates a brand new Pictures occasion passing within the URL generated from the Minio deal with add (see service file). The id, guid, created_at, and updated_at are auto-generated. It then makes use of the session strategies so as to add and commit the entity to the database. Then, the picture is returned to the caller for additional processing. On this case, the guid will likely be used to map the connection between blog_posts and social_media_posts image_guid fields.

    Lastly, we outline the ImageService file:

    import uuid
    import requests
    from io import BytesIO
    from src.content material.repositories.photos.images_repository import ImageRepository
    from src.content material.schemas.photos import ImageCreate, Picture as ImageSchema
    from src.content material.fashions.photos import Pictures
    from PIL import Picture as PilImage
    from botocore.exceptions import NoCredentialsError, ParamValidationError
    from tenacity import retry, wait_random_exponential, stop_after_attempt
    from openai import OpenAI
    from src.core.logs import logger

    class ImageService(ImageRepository):

    def __init__(self, image_repository: ImageRepository, s3_client):
    self.repository = image_repository
    self.s3_client = s3_client

    async def create_image(self, image_data: ImageCreate) -> ImageSchema:
    picture = await self.repository.create_image(image_data)
    return ImageSchema.model_validate(picture)

    @retry(wait=wait_random_exponential(min=1, max=15), cease=stop_after_attempt(3))
    async def generate_image(self, consumer: OpenAI, immediate: str):
    attempt:
    response = consumer.photos.generate(
    mannequin="dall-e-3", # will make configurable in future
    immediate=immediate,
    dimension="1024x1024", # will make configurable in future
    high quality="commonplace", # will make configurable in future
    n=1
    )
    return response
    besides:
    elevate Exception("Did not generate picture")

    async def generate_and_upload_image(self, bucket, openai_client: OpenAI, image_prompt):
    attempt:

    generated_image = await self.generate_image(consumer=openai_client, immediate=image_prompt)
    image_url = generated_image.information[0].url
    response = requests.get(image_url)
    picture = PilImage.open(BytesIO(response.content material))
    image_bytes = BytesIO()
    picture.save(image_bytes, format='PNG')
    image_bytes.search(0)
    file_name = f"test_{uuid.uuid4()}.png"
    await self.upload_to_minio(bucket, image_bytes.getvalue(), file_name)

    return file_name
    besides:
    elevate Exception("Did not add to minio or create database entry")
    async def upload_to_minio(self, bucket, file_data, filename):
    attempt:
    self.s3_client.put_object(Bucket=bucket, Key=filename, Physique=file_data)
    logger.error(msg=f"Uploaded {filename} to MinIO bucket efficiently!")
    besides NoCredentialsError:
    logger.error(msg="Credentials not accessible.")
    besides ParamValidationError as e:
    logger.error(msg=f"Parameter validation failed: {e}")

    The ImageService class defines 4 strategies:

    • create_image: makes use of the ImageRepositoy to avoid wasting photos to the database.
    • generate_image: makes use of the OpenAI API and dall-e-3 to generate a picture. It is wrapped by the tenacity decorator to carry out exponential backoff and retry. The picture technology API tends to be finicky, which helps retry when errors are thrown for no obvious motive apart from points on the API finish.
    • generate_and_upload_image: This methodology calls the generate_image and upload_to_minio strategies to create a picture primarily based on the weblog put up and add the generated picture to Minio.
    • upload_to_minio: uploads information to Minio utilizing the s3_client.

    These layers of abstraction enable us to separate software logic from enterprise (providers) and information entry (repositories). Permitting for simpler upkeep and testing.

    💡Some could argue that these sample impacts efficiency. That is usually negligible and received’t be a reason for concern for small to mid dimension functions. The improved maintainability is often price it for my part till a efficiency refactor is justifiable.

    We are going to run Postgres and Minio in Docker containers to experiment with them. Later within the sequence, I’ll present my residence lab setup, the place I run Postgres in Kubernetes and Minio on my NAS (utilizing TrueNAS functions).

    model: "3"
    providers:
    postgres:
    picture: postgres:16
    hostname: postgres
    ports:
    - "5432:5432"
    setting:
    POSTGRES_USER: postgres-user # EXAMPLE ONLY CHANGE
    POSTGRES_PASSWORD: postgres-pw #EXAMPLE ONLY CHANGE
    POSTGRES_DB: demo
    volumes:
    - ./information:/var/lib/postgresql/information
    - ./infrastructure/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh:ro
    minio:
    picture: quay.io/minio/minio
    command: server /information --console-address :9001
    restart: unless-stopped
    ports:
    - "9000:9000"
    - "9001:9001"
    setting:
    MINIO_ACCESS_KEY: minioadmin # EXAMPLE CHANGE
    MINIO_SECRET_KEY: minioadmin # EXAMPLE ONLY CHANGE
    MINIO_ROOT_USER: minioadmin # EXAMPLE ONLY CHANGE
    MINIO_ROOT_PASSWORD: minioadmin #EXAMPLE ONLY CHANGE
    volumes:
    - minio_data:/information
    volumes:
    pgdata:
    minio_data:

    For every service, we outline:

    • picture: postgres16 | quay.io/minio/minio
    • (non-obligatory) command
    • ports
    • setting: listing of setting variables.
    • volumes: used to persist information within the native listing

    To start out these containers, guarantee you could have the docker run time put in and run the next command:

    docker-compose up -d

    It will launch Postgres and Minio within the background.

    You may test to make sure the providers are working accurately utilizing:

    docker ps

    Search for output much like:

    On this put up, we enhanced our FastAPI software with real-time communication capabilities and improved information administration. Key enhancements embody integrating WebSocket connections to stream progress occasions by a workflow. Launched SQLAlchemy and Alembic for database interactions and migrations utilizing Postgres. Expanded our code construction utilizing Area-Pushed Design patterns, separating core and content material performance. Improved our dependency administration with pyproject.toml and pip-tools.

    Thanks for tuning into half 3 of the GenAI Content material Creator sequence. For half 4, we are going to add a person interface, enable customers to work together with the workflow all through its lifecycle, and replace their content material utilizing a wealthy markdown editor.



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Previous Articlemnbvv
    Next Article Machine Learning Meets SEO: Smarter Keyword Research with AI | by Marketingdigitalzaa | Apr, 2025
    FinanceStarGate

    Related Posts

    Machine Learning

    Acciceptron: An AI-Driven Cognitive State Monitoring System for Automotive Safety Using Multimodal Neuro-HCI Integration | by Karthikeya Redrowtu | May, 2025

    May 25, 2025
    Machine Learning

    Demystifying AI: Understanding What Lies Beyond Machine Learning | by Chandra Prakash Tekwani | May, 2025

    May 25, 2025
    Machine Learning

    Unlock AI/ML Essential Ideas to Help You Take Advantage of AI | by linhvuquach | May, 2025

    May 25, 2025
    Add A Comment

    Comments are closed.

    Top Posts

    AI’s energy impact is still small—but how we handle it is huge

    May 20, 2025

    91 Service Businesses to Start Today

    March 25, 2025

    How Data Collection Services Ensure Accurate Data and Improved Business Decisions

    February 28, 2025

    AI platforms for secure, on-prem delivery

    May 8, 2025

    What are my best investment options as a 'forever renter?'

    April 11, 2025
    Categories
    • AI Technology
    • Artificial Intelligence
    • Data Science
    • Finance
    • Machine Learning
    • Passive Income
    Most Popular

    Attend the world’s biggest AI event, NVIDIA GTC, for free | by Mehul Gupta | Data Science in your pocket | Mar, 2025

    March 12, 2025

    CEOs Get Paid Too Much, According to Pretty Much Everyone in the World | by Bhajan Bishnoi | Feb, 2025

    February 12, 2025

    AI and Data Science Are No Longer the Future – A New Era Begins! | by Arbish Saleem | Mar, 2025

    March 30, 2025
    Our Picks

    How to Create Compelling Brand Narratives That Resonate With Skeptical Consumers

    March 29, 2025

    Clustering of Popular Business Locations in San Francisco Bay Area Using K-Means | by Partha Das | May, 2025

    May 20, 2025

    UiPath Launches Test Cloud to Bring AI Agents to Software Testing 

    March 25, 2025
    Categories
    • AI Technology
    • Artificial Intelligence
    • Data Science
    • Finance
    • Machine Learning
    • Passive Income
    • Privacy Policy
    • Disclaimer
    • Terms and Conditions
    • About us
    • Contact us
    Copyright © 2025 Financestargate.com All Rights Reserved.

    Type above and press Enter to search. Press Esc to cancel.