Welcome to half 3 of this sequence. Within the earlier put up, we tailored our Jupyter Pocket book implementation from half 1 right into a FastAPI server that may serve customers. We discovered easy methods to arrange FastAPI and configure settings, in addition to easy methods to set off workflows from REST endpoints.
Beforehand, we recognized some limitations in our resolution. Customers must look ahead to the response after the workflow is accomplished. We have now additionally been saving our content material in a neighborhood folder. Whereas that is okay for fast experimenting, we are going to increase this resolution to ship progress occasions and use an applicable storage mechanism on this put up.
Along with enhancing our resolution, we are going to proceed to summary our code into succinct modules and pin our dependencies to make sure construct reproducibility.
- Stream progress occasions to customers all through a workflow
- The right way to use SQLAlchemy ORM to work together with Postgres
- Working migrations with Alembic
- Saving photos to Minio
- Run a number of Docker functions along with Docker-Compose
- Configure the WebSocket endpoint within the FastAPI software
├── README.md
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── variations
├── alembic.ini
├── docker-compose.yml
├── infrastructure
│ └── init-db.sh
├── pyproject.toml
├── necessities
│ ├── requirements-dev.txt
│ └── necessities.txt
└── src
├── content material
│ ├── api.py
│ ├── fashions
│ ├── prompts
│ ├── repositories
│ ├── schemas
│ ├── providers
│ └── workflows
├── core
│ ├── api.py
│ ├── database.py
│ ├── logs.py
│ └── middleware.py
├── essential.py
└── settings
└── config.py
In comparison with the earlier put up, the general construction of our tasks has expanded. We have now a couple of top-level directories:
- Alembic — We are going to talk about this extra within the SQLAlchemy part beneath, however this listing incorporates our database migrations. A migration basically modifications the construction of the database. These information are auto-generated, and we solely want to change
env.py
andalembic.ini
. - Alembic.ini is an auto-generated associated config file. We solely have to take away one line from it, as many of the configuration is about up in
env.py
docker-compose.yml
incorporates configuration to run Postgres and Minio in containers on the identical community with provisioned storage utilizing volumes- Infrastructure: Our database fashions use UUIDs for GUIDS. This listing incorporates a script to allow the
uuid.ossp
extension the primary time Postgres runs Pyproject.toml
is a configuration file for packaging-related instruments.- Necessities: This holds our manufacturing and growth dependencies pinned by model. The organising dependencies part offers extra on this.
Supply Listing
The supply listing incorporates the majority of the supply code. The code is structured round domains consisting of core and content material. The core area consists of information that impression the server holistically (middleware) or include logic for use in different domains, comparable to database classes, logging perform, and a well being test for later k8s deployment. The content material area makes use of DDD tactical patterns: Repository and Providers to cleanly summary code. Every area (subsequent up will likely be customers) will comply with the identical normal construction:
- Fashions: SQLAlchemy fashions that outline the database schema.
- Schema: Pydantic fashions which deal with enter and output information validation and serialization.
- Repositories: The repositories are accountable for information entry. Separating information entry logic, making certain simpler testing and upkeep.
- Providers: coordinates enterprise logic, interfacing between software logic and information entry logic (repository)
Content material area particularly has:
- Prompts: a central place to retailer prompts. For now, different instruments later within the sequence will deal with immediate versioning and a extra strong strategy.
- Workflows: shops all workflows.
Partially 2, we arrange our digital setting utilizing Conda and put in all our dependencies by the command line. This works initially, however the underlying dependencies can change, breaking the supply code. On this challenge, we introduce pyproject.toml
, which has grow to be the de facto commonplace for configuring Python functions.
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"pyt[project]
identify = "content_assistant"
authors = [{ name = "Markell Richards" }]
model = "0.0.1"
requires-python = ">=3.12"
readme = "README.md"
dependencies = [
"fastapi",
"uvicorn[standard]",
"tenacity",
"asyncio",
"llama-index-core",
"llama-index-llms-openai",
"tavily-python",
"openai",
"pydantic-settings",
"minio",
"JSON-log-formatter",
"sqlalchemy",
"alembic",
"psycopg2-binary",
"boto3",
"asyncpg",
]
[project.optional-dependencies]
dev = ["ruff", "black"]
With our software metadata and dependencies outlined, we have to create a necessities.txt for pip to put in. We are going to use pip-tools
which encompass pip-compile
and pip-sync
. pip-compile
pins the dependencies so pip
is aware of easy methods to resolve them and which variations to put in. Nice dependency administration revolves round dependency resolving and dependency locking. These two ideas enable us to have deterministic builds. Making certain the app is constructed the identical no matter the place and when.
To generate the hashes and lock variations, we use the next instructions:
pip-compile --generate-hashes -o necessities/necessities.txt pyproject.toml
pip-compile --extra dev -o necessities/requirements-dev.txt pyproject.toml
We use requirements-dev for native growth as it’s going to embody further dependencies outlined within the dev part of our pyproject.toml
. These aren’t wanted for the manufacturing construct and are excluded from necessities.txt
.
To put in the dependencies, we use:
pip-sync necessities/necessities.txt necessities/requirements-dev.txt
pip-sync
installs dependencies primarily based on the output of our pip-compile
.
Every area has its personal api.py
with a set of routes. These are outlined as routers and are included in our essential.py
, as proven right here:
app = FastAPI(model=model)
app.include_router(core_router)
app.include_router(content_router)
Partially 6 of this sequence, we are going to deploy our full-stack software to Kubernetes. Sometimes, the containers in Kubernetes pods ought to have a solution to test the well being of an software. Extra on this will likely be coated later, however for now, we outlined a easy well being test endpoint that returns the model of our API (model is laid out in pyproject.toml
):
from fastapi import APIRouter, standing
from fastapi.requests import Request
router = APIRouter(tags=["Core Endpoints"])
@router.get("/well being", status_code=standing.HTTP_200_OK)
async def healthcheck(request: Request) -> dict:
return {"model": request.app.model}
We set up a WebSocket connection to supply customers with real-time updates as a workflow proceeds by its steps.
💡 It is a minimal and easy instance of streaming updates utilizing a WebSocket connection. It can must be improved earlier than being utilized in manufacturing.
router = APIRouter(tags=["Content Endpoints"])
@router.websocket("/content material")
async def advancedContentFlow(websocket: WebSocket, db: Session = Relies upon(get_db), settings: Settings = Relies upon(get_settings)):
await websocket.settle for()s3_client = boto3.consumer(
's3',
endpoint_url=settings.MINIO_ENDPOINT,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name='us-east-1',
# Disable SSL verification if utilizing HTTP
config=boto3.session.Config(signature_version='s3v4')
)
workflow_repository = WorkflowRepository(db)
blog_repository = BlogRepository(db)
social_media_repository = SocialMediaRepository(db)
image_repository = ImageRepository(db)
workfow_service = WorkflowService(workflow_repository)
blog_service = BlogService(blog_repository)
social_media_service = SocialMediaService(social_media_repository)
image_service = ImageService(image_repository=image_repository, s3_client=s3_client)
workflow = AdvancedWorkflow(settings, db, workfow_service=workfow_service, blog_service=blog_service,
social_media_service=social_media_service, image_service=image_service)
attempt:
information = await websocket.receive_json()
logger.data(information)
handler: WorkflowHandler = workflow.run(subject=information["topic"], analysis=information["research"])
async for occasion in handler.stream_events():
if isinstance(occasion, ProgressEvent):
await websocket.send_json({
"sort": "progress_event",
"payload": str(occasion.msg)
})
end result = await handler
await websocket.send_json({
"sort": "outcomes",
"payload": end result
})
besides Exception as e:
await db.rollback()
await websocket.send_json({
"sort": "error",
"payload": "One thing went flawed"
})
logger.error(e)
lastly:
await websocket.shut()
Stepping by the code:
- Utilizing dependency injection, we inject a database session and settings object into every connection.
- Settle for an incoming WebSocket connection.
- Minio is an S3 Suitable object storage. We use the AWS boto SDK to work together with Minio. We create an
s3_client
to cross into ourimage_service
which incorporates the logic to add photos. - We create situations of every repository and repair sort: workflow, weblog, social media, and picture.
- We create an occasion of AdvanceWorkflow and cross in settings, db session, and each service.
- We settle for a JSON payload containing a content material subject and analysis boolean flag.
- We run our workflow and pay attention for
ProgressEvent
. Because the workflow progresses, every step will publish aProgressEvent
with amsg
that will likely be despatched to the consumer. - As soon as the workflow finishes, the person will get a end result payload signifying the workflow as accomplished or failed.
- Error dealing with within the occasion one thing goes flawed
- Lastly, we shut the WebSocket connection.
As talked about, we added Postgres and Minio to the challenge to persist in storing entities created all through the workflows for later retrieval. Above is an easy entity relationship diagram (ERD) of how the tables map collectively. That is topic to alter, however at this level, this construction provides us some normal entry patterns:
- Every weblog and social media put up belongs to at least one workflow. If we fetch a workflow, we are able to seize all associated entities.
- In a single workflow, weblog and social media posts share the identical picture generated. Nonetheless, later, the person can change the media used by the interface.
- A person can edit the content material of weblog and social media put up in later enhancements.
💡 There’s a scenario that arises the place photos will likely be orphaned. How would you do the cleanup course of? (will likely be revealed later within the sequence)
To facilitate the interplay with the database and administration of its construction, we use SqlAlchemy and Alembic. SQLAlchemy is an object-relational mapper (ORM) that helps you work together with databases utilizing Python. It affords varied patterns that make managing information simpler and extra environment friendly. SQLAlchemy is designed for prime efficiency, permitting fast entry to database data utilizing easy and clear Python.
Alembic is a light-weight database migration device constructed for use with SqlAlchemy. It performs “migrations” to alter the construction of the database. By change, this consists of including tables, updating fashions, and so forth.
Every SqlAlchemy mannequin follows the identical sample so I’ll present only one instance. See the supply code for the remainder of the entities.
import uuid
from src.core.database import Base
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import varieties, ForeignKey, func, DateTime, Column
from sqlalchemy.orm import relationshipclass BlogPosts(Base):
__tablename__ = 'blog_posts'
id = Column(varieties.INTEGER, primary_key=True, autoincrement=True)
guid = Column(UUID(as_uuid=True),
primary_key=False,
distinctive=True,
nullable=False,
server_default=func.uuid_generate_v4(),
default=uuid.uuid4)
title = Column(varieties.String)
content material = Column(varieties.TEXT)
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
workflow_guid = Column(ForeignKey("workflows.guid"))
image_guid = Column(ForeignKey("photos.guid"))
picture = relationship("Pictures")
Right here, we outline our blog_posts desk, which is proven within the ERD. We outline every column and its related sort. Postgres’ built-in perform uuid_generate_v4()
generates a singular identifier for the guid column. func.now
generates the timestamp for created_at
and updated_at
columns. Then outline our workflow and picture relationships utilizing ForeignKey
. Lastly, we use the relationship
module to permit easy accessibility to photographs associated to a weblog put up utilizing the ORM.
To make use of the uuid_generate_v4()
perform with Postgres, we should make sure the extension is enabled in our database.
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
EOSQL
This script is handed into our docker occasion working Postgres and can allow the extension.
Now that our mannequin is outlined and extension is configured, lets create a migration.
Within the root listing, you’ll run the next command:
alembic init alembic
It will generate the next information:
- /variations listing: incorporates every migration generated
- env.py: used to configure alembic
- README.md: generic data from alembic
- script.py.mako
- alembic.ini
After these information are generated, we have to modify alembic.ini
and env.py
. In alembic.ini, take away the next line:
sqlalchemy.url =
Subsequent, we modify a couple of strains in env.py
:
## import crucial supply code
from src.core.database import Base
from src.settings.config import Settings
from src.content material.fashions.workflows import Workflow
from src.content material.fashions.blog_posts import BlogPosts
from src.content material.fashions.social_media_post import SocialMediaPosts
from src.content material.fashions.photos import Pictures
# alembic generated code
target_metadata = Base.metadata
settings = Settings()
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
# remainder of alembic generated code
The fashions should be imported into this file for the migrations to run. We additionally import Base from our database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.core.logs import logger
from src.settings import settings
Base = declarative_base()
# for alembic
sync_engine = create_engine(settings.DATABASE_URL, echo=True)
async_engine = create_async_engine(settings.ASYNC_DATABASE_URL, echo=True, future=True)AsyncSessionFactory = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_db():
async with AsyncSessionFactory() as session:
yield session
Base
incorporates the metadata of our fashions, which every of our SqlALchemy fashions inherits. Alembic is synchronous by nature in comparison with FastAPI, which is asynchronous. I outlined two database engines to keep away from overcomplicating the alembic configuration to be async. One synchronous that makes use of a unique URL is utilized by Alembic, and FastAPI makes use of one other async URL. Right here is an instance of their URLs:
ASYNC_DATABASE_URL=postgresql+asyncpg://postgres-user:postgres-pw@localhost:5432/demo # EXAMPLE
DATABASE_URL=postgresql://postgres-user:postgres-pw@localhost:5432/demo #EXAMPLE
With all the things configured, we are able to run the next two instructions to generate a migration and replace our database:
alembic revision --autogenerate -m "Preliminary migration"
alembic improve head # updates database tables & schemas
from llama_index.core.workflow import Workflow, Occasion, StartEvent, StopEvent, Context, step
from sqlalchemy.orm import Session
from src.content material.providers.workflow_service import WorkflowService
from src.content material.providers.blog_service import BlogService
from src.content material.providers.social_media_service import SocialMediaService
from src.content material.providers.image_service import ImageService
from src.content material.schemas.workflow import WorkflowStatusType, WorkflowCreate, WorkflowUpdate
from src.content material.schemas.blog_posts import BlogCreate, BlogUpdate
from src.content material.schemas.social_media_post import SocialMediaCreate, PlatformType, SocialMediaUpdate
from src.content material.schemas.tavily_search import TavilySearchInput
from src.content material.schemas.photos import ImageCreate
from src.content material.prompts.prompts import *
from src.content material.providers.tavily_search_service import tavily_search
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from openai import OpenAI
from src.core.logs import loggerclass ResearchEvent(Occasion):
subject: str
analysis: bool
class BlogEvent(Occasion):
subject: str
analysis: bool
research_material: str
class SocialMediaEvent(Occasion):
weblog: str
class SocialMediaCompleteEvent(Occasion):
cross
class IllustratorEvent(Occasion):
weblog: str
class IllustratorCompleteEvent(Occasion):
url: str
class ProgressEvent(Occasion):
msg: str
class RetryEvent(Occasion):
cross
class WorkflowFailedEvent(Occasion):
error: str
class AdvancedWorkflow(Workflow):
def __init__(self, settings, db: Session, workfow_service: WorkflowService, blog_service: BlogService, social_media_service: SocialMediaService, image_service: ImageService, timeout=None, verbose=None):
tremendous().__init__(timeout, verbose)
self.settings = settings
self.db = db
self.workflow_service = workfow_service
self.blog_service = blog_service
self.social_media_service = social_media_service
self.image_service = image_service
@step
async def start_event(self, ev: StartEvent, ctx: Context) -> ResearchEvent | BlogEvent | WorkflowFailedEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Beginning content material creation workflow"))
workflow_data = WorkflowCreate(standing=WorkflowStatusType.INPROGRESS)
attempt:
workflow = await self.workflow_service.create_workflow(workflow_data=workflow_data)
await ctx.set(key="workflow_id", worth=workflow.id)
await ctx.set(key="workflow_guid", worth=workflow.guid)
if ev.analysis:
return ResearchEvent(subject=ev.subject, analysis=ev.analysis)
return BlogEvent(subject=ev.subject, analysis=ev.analysis, research_material="None")
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def research_event(self, ev: ResearchEvent, ctx: Context) -> BlogEvent | WorkflowFailedEvent:
ctx.write_event_to_stream(ProgressEvent(msg=f"Looking out web for details about {ev.subject}"))
attempt:
search_input = TavilySearchInput(
question=ev.subject,
max_results=3,
search_depth="primary"
)
research_material = await tavily_search(search_input, api_key=self.settings.TAVILY_SEARCH_API_KEY)
return BlogEvent(subject=ev.subject, analysis= ev.analysis, research_material=research_material)
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def blog_event(self, ev: BlogEvent, ctx: Context) -> SocialMediaEvent | WorkflowFailedEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Writing weblog put up"))
prompt_template = ""
workflow_guid = await ctx.get("workflow_guid")
attempt:
if(ev.analysis):
prompt_template = BLOG_AND_RESEARCH_TEMPLATE.format(query_str=ev.subject, analysis=ev.research_material)
else:
prompt_template = BLOG_TEMPLATE.format(query_str=ev.subject)
llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
response = await llm.acomplete(prompt_template)
blog_data = BlogCreate(title=ev.subject, content material=response.textual content, workflow_guid=workflow_guid)
blog_post = await self.blog_service.create_blog(blog_data=blog_data)
await ctx.set(key="blog_id", worth=blog_post.id)
ctx.send_event(SocialMediaEvent(weblog=blog_data.content material))
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def social_media_event(self, ev: SocialMediaEvent, ctx: Context) -> SocialMediaCompleteEvent | IllustratorEvent | WorkflowFailedEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Writing social media put up"))
worklflow_guid = await ctx.get("workflow_guid")
attempt:
prompt_template = LINKED_IN_TEMPLATE.format(blog_content=ev.weblog)
llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
response = await llm.acomplete(prompt_template)
sm_data = SocialMediaCreate(content material=response.textual content, platform_type=PlatformType.LINKEDIN, workflow_guid=worklflow_guid)
sm_post = await self.social_media_service.create_social_media_post(social_media_data=sm_data)
await ctx.set(key="sm_id", worth=sm_post.id)
ctx.send_event(IllustratorEvent(weblog=ev.weblog))
return SocialMediaCompleteEvent()
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def illustration_event(self, ev: IllustratorEvent, ctx: Context) -> IllustratorCompleteEvent | WorkflowFailedEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Drawing illustration for content material"))
attempt:
llm = LlamaOpenAI(mannequin=self.settings.OPENAI_MODEL, api_key=self.settings.OPENAI_API_KEY)
image_prompt_instructions_generator = IMAGE_GENERATION_TEMPLATE.format(blog_post=ev.weblog)
image_prompt = await llm.acomplete(image_prompt_instructions_generator, formatted=True)
openai_client = OpenAI(api_key=self.settings.OPENAI_API_KEY)
file_name = await self.image_service.generate_and_upload_image(bucket=self.settings.MINIO_BUCKET_NAME, openai_client=openai_client, image_prompt=image_prompt.textual content)
url = f"{self.settings.MINIO_ENDPOINT}/{self.settings.MINIO_BUCKET_NAME}/{file_name}"
image_data = ImageCreate(url=url)
picture = await self.image_service.create_image(image_data=image_data)
await ctx.set("image_guid", picture.guid)
return IllustratorCompleteEvent(url=url)
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def step_workflow_success(self, ev:SocialMediaCompleteEvent | IllustratorCompleteEvent, ctx: Context) -> StopEvent | WorkflowFailedEvent:
if (
ctx.collect_events(
ev,
[SocialMediaCompleteEvent, IllustratorCompleteEvent]
) is None
) : return None
workflow_id = await ctx.get("workflow_id")
image_guid = await ctx.get("image_guid")
blog_id = await ctx.get("blog_id")
sm_id = await ctx.get("sm_id")
workflow_update_data = WorkflowUpdate(id=workflow_id, standing=WorkflowStatusType.COMPLETE)
blog_update_data = BlogUpdate(id=blog_id, image_guid=image_guid)
sm_update_data = SocialMediaUpdate(id=sm_id, image_guid=image_guid)
attempt:
await self.workflow_service.update_workflow(workflow_id, workflow_update_data)
await self.blog_service.update_blog(blog_id=blog_id, blog_data=blog_update_data)
await self.social_media_service.update_social_media_post(sm_id=sm_id, sm_data=sm_update_data)
return StopEvent(end result="Achieved")
besides Exception as e:
return WorkflowFailedEvent(error=f"{e}")
@step
async def step_workflow_failed(self, ev: WorkflowFailedEvent, ctx: Context) -> StopEvent:
attempt:
workflow_id = await ctx.get("workflow_id")
workflow_update_data = WorkflowUpdate(id=workflow_id, standing=WorkflowStatusType.FAILED)
await self.workflow_service.update_workflow(workflow_id, workflow_update_data)
return StopEvent(end result="Failed")
besides:
logger.error(ev.error)
return StopEvent(end result="Failed")
Our workflow has largely stayed the identical, aside from the addition of utilizing providers to deal with logic associated to managing entities in Postgres and Minio. We additionally added a failure step in case something goes flawed. To notice, social media workflow is now calling the illustration step. This avoids the identical session performing two operations concurrently, inflicting a collision. Sooner or later, we are able to cross in a session manufacturing unit to allow parallel processing, however I didn’t for the sake of time.
In Area-Pushed Design (DDD), tactical patterns are important design pointers that successfully manage and construction code inside a bounded context. We implement the repository and repair sample described above in our code base. The patterns are repeatable, so like earlier than, I present one instance, on this case for Pictures, as they embody further logic for producing and importing photos.
First, we outline an interface with photos that the ImageRepository
should implement.
from abc import ABC, abstractmethod
from src.content material.schemas.photos import ImageCreate, Pictureclass IImagesRepository(ABC):
@abstractmethod
async def create_image(self, image_data: ImageCreate) -> Picture:
cross
Then we outline the ImageRepositoy
:
from src.content material.repositories.photos.images_repository_interface import IImagesRepository
from src.content material.schemas.photos import Picture, ImageCreate
from src.content material.fashions.photos import Pictures
from sqlalchemy.ext.asyncio import AsyncSessionclass ImageRepository(IImagesRepository):
def __init__(self, async_db_session: AsyncSession):
self.db = async_db_session
async def create_image(self, image_data) -> Picture:
if not isinstance(image_data, ImageCreate):
elevate ValueError("Anticipated occasion of ImageCreate")
attempt:
new_image_data = Pictures(url=image_data.url)
self.db.add(new_image_data)
await self.db.commit()
await self.db.refresh(new_image_data)
return Picture(
id=new_image_data.id,
guid=new_image_data.guid,
url=new_image_data.url
)
besides:
await self.db.rollback()
elevate
The interface outlined one create_image methodology. The ImageRepositoy
implements this methodology by checking if the information handed in matches the Pydantic schema ImageCreate
. If it’s a legitimate payload, it creates a brand new Pictures
occasion passing within the URL generated from the Minio deal with add (see service file). The id
, guid
, created_at
, and updated_at
are auto-generated. It then makes use of the session strategies so as to add and commit the entity to the database. Then, the picture is returned to the caller for additional processing. On this case, the guid
will likely be used to map the connection between blog_posts
and social_media_posts
image_guid
fields.
Lastly, we outline the ImageService
file:
import uuid
import requests
from io import BytesIO
from src.content material.repositories.photos.images_repository import ImageRepository
from src.content material.schemas.photos import ImageCreate, Picture as ImageSchema
from src.content material.fashions.photos import Pictures
from PIL import Picture as PilImage
from botocore.exceptions import NoCredentialsError, ParamValidationError
from tenacity import retry, wait_random_exponential, stop_after_attempt
from openai import OpenAI
from src.core.logs import loggerclass ImageService(ImageRepository):
def __init__(self, image_repository: ImageRepository, s3_client):
self.repository = image_repository
self.s3_client = s3_client
async def create_image(self, image_data: ImageCreate) -> ImageSchema:
picture = await self.repository.create_image(image_data)
return ImageSchema.model_validate(picture)
@retry(wait=wait_random_exponential(min=1, max=15), cease=stop_after_attempt(3))
async def generate_image(self, consumer: OpenAI, immediate: str):
attempt:
response = consumer.photos.generate(
mannequin="dall-e-3", # will make configurable in future
immediate=immediate,
dimension="1024x1024", # will make configurable in future
high quality="commonplace", # will make configurable in future
n=1
)
return response
besides:
elevate Exception("Did not generate picture")
async def generate_and_upload_image(self, bucket, openai_client: OpenAI, image_prompt):
attempt:
generated_image = await self.generate_image(consumer=openai_client, immediate=image_prompt)
image_url = generated_image.information[0].url
response = requests.get(image_url)
picture = PilImage.open(BytesIO(response.content material))
image_bytes = BytesIO()
picture.save(image_bytes, format='PNG')
image_bytes.search(0)
file_name = f"test_{uuid.uuid4()}.png"
await self.upload_to_minio(bucket, image_bytes.getvalue(), file_name)
return file_name
besides:
elevate Exception("Did not add to minio or create database entry")
async def upload_to_minio(self, bucket, file_data, filename):
attempt:
self.s3_client.put_object(Bucket=bucket, Key=filename, Physique=file_data)
logger.error(msg=f"Uploaded {filename} to MinIO bucket efficiently!")
besides NoCredentialsError:
logger.error(msg="Credentials not accessible.")
besides ParamValidationError as e:
logger.error(msg=f"Parameter validation failed: {e}")
The ImageService
class defines 4 strategies:
- create_image: makes use of the
ImageRepositoy
to avoid wasting photos to the database. - generate_image: makes use of the OpenAI API and
dall-e-3
to generate a picture. It is wrapped by the tenacity decorator to carry out exponential backoff and retry. The picture technology API tends to be finicky, which helps retry when errors are thrown for no obvious motive apart from points on the API finish. - generate_and_upload_image: This methodology calls the
generate_image
andupload_to_minio
strategies to create a picture primarily based on the weblog put up and add the generated picture to Minio. - upload_to_minio: uploads information to Minio utilizing the s3_client.
These layers of abstraction enable us to separate software logic from enterprise (providers) and information entry (repositories). Permitting for simpler upkeep and testing.
💡Some could argue that these sample impacts efficiency. That is usually negligible and received’t be a reason for concern for small to mid dimension functions. The improved maintainability is often price it for my part till a efficiency refactor is justifiable.
We are going to run Postgres and Minio in Docker containers to experiment with them. Later within the sequence, I’ll present my residence lab setup, the place I run Postgres in Kubernetes and Minio on my NAS (utilizing TrueNAS functions).
model: "3"
providers:
postgres:
picture: postgres:16
hostname: postgres
ports:
- "5432:5432"
setting:
POSTGRES_USER: postgres-user # EXAMPLE ONLY CHANGE
POSTGRES_PASSWORD: postgres-pw #EXAMPLE ONLY CHANGE
POSTGRES_DB: demo
volumes:
- ./information:/var/lib/postgresql/information
- ./infrastructure/init-db.sh:/docker-entrypoint-initdb.d/init-db.sh:ro
minio:
picture: quay.io/minio/minio
command: server /information --console-address :9001
restart: unless-stopped
ports:
- "9000:9000"
- "9001:9001"
setting:
MINIO_ACCESS_KEY: minioadmin # EXAMPLE CHANGE
MINIO_SECRET_KEY: minioadmin # EXAMPLE ONLY CHANGE
MINIO_ROOT_USER: minioadmin # EXAMPLE ONLY CHANGE
MINIO_ROOT_PASSWORD: minioadmin #EXAMPLE ONLY CHANGE
volumes:
- minio_data:/information
volumes:
pgdata:
minio_data:
For every service, we outline:
- picture: postgres16 | quay.io/minio/minio
- (non-obligatory) command
- ports
- setting: listing of setting variables.
- volumes: used to persist information within the native listing
To start out these containers, guarantee you could have the docker run time put in and run the next command:
docker-compose up -d
It will launch Postgres and Minio within the background.
You may test to make sure the providers are working accurately utilizing:
docker ps
Search for output much like:
On this put up, we enhanced our FastAPI software with real-time communication capabilities and improved information administration. Key enhancements embody integrating WebSocket connections to stream progress occasions by a workflow. Launched SQLAlchemy and Alembic for database interactions and migrations utilizing Postgres. Expanded our code construction utilizing Area-Pushed Design patterns, separating core and content material performance. Improved our dependency administration with pyproject.toml
and pip-tools
.
Thanks for tuning into half 3 of the GenAI Content material Creator sequence. For half 4, we are going to add a person interface, enable customers to work together with the workflow all through its lifecycle, and replace their content material utilizing a wealthy markdown editor.