Inserts, Updates, and Upserts

Now that we've looked at how to query data, let's see how to store it in the database in the first place. In each of these cases you'll build up your TableBase objects as Python objects and use the DBConnection to directly sync these into the database. The operations are split into functions of what you intend to do with the data:

  • DBConnection.insert(): Insert a list of objects into the database (INSERT).
  • DBConnection.update(): Update a list of objects in the database (UPDATE).
  • DBConnection.upsert(): Insert or update a list of objects in the database (INSERT ... ON CONFLICT DO UPDATE).

Inserting data

Let's consider this table schema. You'll have an simple employee table with a bit of metadata about each employee.

from iceaxe import DBConnection, select, TableBase, Field

class Employee(TableBase):
    id: int | None = Field(primary_key=True, default=None)
    name: str
    age: int

We configure the id of the table explicitly as primary key by configuring its properties in Field(). We also set it to None by default to signal to Iceaxe and Postgres that your table should be auto-incremented. Upon insert it will automatically assign a unique integer to each row.

Define a few employees with a typical constructor. Because our TableBase subclasses Pydantic, we'll validate all input arguments for you on construction. If you pass in a string for the age, it will raise a validation error.

employees = [
    Employee(name="Marcus Holloway", age=28),
    Employee(name="Astrid Keller", age=42),
    Employee(name="Jasper Okafor", age=36),
]

employees[0].id
> None

await conn.insert(employees)

employees[0].id
> 1

Notice that before we insert the data, the id field is None. Until we call the insert method, these objects are just sitting in Python memory without any database backing. It takes our explicit insert call to sync the data to the database and assign the auto-generated primary keys back into the instance element.

Updating data

Once you have Iceaxe instances in Python, either fetched from a select or after you've made an insert, you can manipulate the attributes like you do with any regular Python object. In the background we track all of your field modifications so we can write them out when requested. When you're ready to save your changes back to the database, you can call the update method and pass it your instance.

from iceaxe import select

query = select(Employee).where(Employee.name == "Marcus Holloway")
results = await conn.exec(query)

marcus = results[0]
marcus.age = 29

await conn.update([marcus])

Before the conn.update(), the database will have the old value. After the update, the database will have the new value which we can confirm with a fresh select.

results = await conn.exec(query)
results[0].age
> 29

Upserting data

Sometimes you want to insert a row if it doesn't exist, or update it if it does. This is useful if you need to enforce some uniqueness constraint on your data where you only want one instance of an object but might not know at call-time whether you already have one stored. In some cases you could do a SELECT followed by an INSERT to accomplish this same thing, but under high load this approach can run into race conditions where you might end up inserting a duplicate row.

This is called an upsert operation (INSERT ... ON CONFLICT DO UPDATE). Iceaxe provides a dedicated upsert method for this purpose.

Let's say we want to add some employees to our database, but we want to update their information if they already exist based on their name. We'll need to modify our table to add a unique constraint on the name field.

from iceaxe import UniqueConstraint

class Employee(TableBase):
    id: int | None = Field(primary_key=True, default=None)
    name: str
    age: int

    table_args = [
        UniqueConstraint(columns=["name"])
    ]
employees = [
    Employee(name="Marcus Holloway", age=28),
    Employee(name="Astrid Keller", age=42),
]

# Upsert based on name, updating age if the name already exists
await conn.upsert(
    employees,
    conflict_fields=(Employee.name,),
    update_fields=(Employee.age,)
)

In this example, if an employee with the name "Marcus Holloway" already exists, their age will be updated to 28. If they don't exist, a new record will be created. The conflict_fields parameter specifies which fields should be used to detect conflicts, while update_fields specifies which fields should be updated when a conflict is found.

Returning Values

Often, you'll want to know which records were affected by the upsert operation. The returning_fields parameter allows you to specify which fields should be returned for each upserted record.

results = await conn.upsert(
    employees,
    conflict_fields=(Employee.name,),
    update_fields=(Employee.age,),
    returning_fields=(Employee.id, Employee.name)
)
for employee_id, name in results:
    print(f"Upserted employee {name} with ID {employee_id}")

The returned values can be useful for tracking which records were affected or getting automatically generated fields like primary keys.

Query updates

If you have a large number of rows to update in a consistent way, you can also make use of a database level UPDATE statement. This avoids the SQL->Python->SQL round trip of doing a fetch before your updates. We export an update query builder for this purpose. It's close in nature to a select query but lets you choose the type values to update.

from iceaxe import update

query = update(Employee).set(Employee.name, "Marcus Keller").where(Employee.name == "Marcus Holloway")

query.build()
> ('UPDATE "employee" SET "employee"."name" = $1
    WHERE "employee"."name" = $2', ['Marcus Keller', 'Marcus Holloway'])

Batch updates

If you're making a large number of updates at the same time, you'll want to avoid individual calls to DBConnection functions. Since each call incurs a round trip to the database, individual looping can be quite inefficient when compared to raw Postgres speeds.

All our manipulation functions (insert, update, upsert) accept a list of objects. If you can create your objects in bulk, you'll get the best performance by passing them all to the DBConnection functions. Internally we take care of optimizing these network operations for you to respect Postgres limits. Specifically:

  • Splitting up the list of table instances into batches of parameters.
  • Identifying manipulations that change the same keys and can be batched together.
  • Sending separate operations for each underlying database table.

This addresses the following Postgres limitations:

  • Postgres enforces an overhead of ~32k parameters per query statement.
  • Prepared statements can't batch multiple statements in the same query execution.

Because of our internal ORM bookkeeping, we can optimize these network requests to issue the minimum required queries and bundle as many operations as possible into a single query execution. This makes sure your database communication is as fast as possible even during large data manipulation.

Dirty objects

When you modify a local object in Python, you'll need to explicitly call conn.update() to sync the changes back to the database. If you don't, the changes will be lost when the object is garbage collected. During development and testing, it can be helpful to track these "dirty" objects - instances that have been modified but not synced to the database.

You can enable this tracking by setting the uncommitted_verbosity parameter when creating your DBConnection:

# Enable tracking with different warning levels
conn = DBConnection(
    pg_connection,
    uncommitted_verbosity="ERROR"
)

# Disable tracking (recommended for production)
conn = DBConnection(
    pg_connection,
    uncommitted_verbosity=None
)

The verbosity levels determine how aggressively the system will notify you about uncommitted changes:

  • ERROR: Raises errors in your logs when objects are modified but not committed
  • WARNING: Logs warnings when objects are modified but not committed
  • INFO: Provides informational messages about modified but uncommitted objects
  • None: Disables tracking entirely (default)

For example, with WARNING level enabled:

user = await conn.exec(select(User).where(User.id == 1))
user.name = "New Name"
# Session closes without calling conn.update([user])
# WARNING: Object <User id=1> has uncommitted changes: {'name'}
# File "...", line 10, in <module>
#   user.name = "New Name"

We also provide the initial location of this object's modification in the log message so you can track down where the object was modified and determine if this was an intentional change.

Performance Considerations

It's important to note that dirty object tracking introduces non-trivial performance overhead. The system needs to:

  1. Track all modifications to objects
  2. Maintain a registry of modified objects and the location of the modification
  3. Check the state of objects when the session closes

For this reason, it's recommended to:

  • Enable tracking during development and testing to catch potential bugs
  • Set uncommitted_verbosity=None in production environments
  • Use tracking selectively when debugging specific issues