Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: small fix for the insert-method of pgvector #1004

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions memgpt/agent_store/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,26 +458,52 @@ def query(self, query: str, query_vec: List[float], top_k: int = 10, filters: Op
records = [result.to_record() for result in results]
return records

def get_optimal_chunk_size(self, num_columns: int) -> int:
"""
The optimal chunk size = (total %s - or limit of struct pack of PG8000) / total columns params

And then step down to the mod 100 number.

Currently, max of pg8000 is 32767 for %s

Usage:
get_optimal_chunk_size(4) # 8100
get_optimal_chunk_size(7) # 4600
"""

PG8000_STRUCT_PACK_LIMIT = 32767
import math
hard_limit = math.ceil(PG8000_STRUCT_PACK_LIMIT / num_columns)
util_limit = hard_limit - (hard_limit % 100)
chunk_size = round(util_limit, -2)
return chunk_size

def insert_many(self, records: List[RecordType], exists_ok=True, show_progress=False):
from sqlalchemy.dialects.postgresql import insert

# TODO: this is terrible, should eventually be done the same way for all types (migrate to SQLModel)
if len(records) == 0:
return
if isinstance(records[0], Passage):
# Get the number of columns in the table
num_columns = len(self.db_model.__table__.columns)
# Get the optimal chunk size for the number of columns
chunk_size = self.get_optimal_chunk_size(num_columns)
# Split the records into the largest possible chunks
chunks = [records[i:i+chunk_size] for i in range(0, len(records), chunk_size)]

with self.engine.connect() as conn:
db_records = [vars(record) for record in records]
# print("records", db_records)
stmt = insert(self.db_model.__table__).values(db_records)
# print(stmt)
if exists_ok:
upsert_stmt = stmt.on_conflict_do_update(
index_elements=["id"], set_={c.name: c for c in stmt.excluded} # Replace with your primary key column
)
print(upsert_stmt)
conn.execute(upsert_stmt)
else:
conn.execute(stmt)
# Execute the insert statement for each chunk
for chunk in chunks:
db_records = [vars(record) for record in chunk]
stmt = insert(self.db_model.__table__).values(db_records)
if exists_ok:
upsert_stmt = stmt.on_conflict_do_update(
index_elements=["id"], set_={c.name: c for c in stmt.excluded} # Replace with your primary key column
)
conn.execute(upsert_stmt)
else:
conn.execute(stmt)
conn.commit()
else:
with self.session_maker() as session:
Expand Down
Loading