Skip to content

Data Management

This chapter introduces how to use SDK for data management operations, including creating, querying, updating, and deleting data.

  • ✅ Data creation and storage
  • ✅ Data query and filtering
  • ✅ Data update and deletion
  • ✅ Batch operation support
  • ✅ Data streaming processing
from omniedge import OmniEdge
client = OmniEdge(api_key="sk-your-api-key-here")
# Create single data record
response = client.data.create(
collection="users",
data={
"name": "John Doe",
"email": "john@example.com",
"age": 30
}
)
print(f"Created data ID: {response.id}")
# Query all data
all_data = client.data.list(collection="users")
# Query with conditions
filtered_data = client.data.search(
collection="users",
filter={
"age": {"$gte": 18}
}
)
# Paginated query
paged_data = client.data.list(
collection="users",
limit=10,
offset=20
)
# Update single record
response = client.data.update(
collection="users",
id="user-id-here",
data={
"email": "newemail@example.com",
"updated_at": "2024-01-01T00:00:00Z"
}
)
print(f"Update status: {response.status}")
# Delete single record
response = client.data.delete(
collection="users",
id="user-id-here"
)
print(f"Delete status: {response.deleted}")
# Batch delete
response = client.data.delete_many(
collection="users",
filter={
"status": "inactive"
}
)
print(f"Deleted record count: {response.deleted_count}")
# Batch create data
records = [
{"name": "User 1", "email": "user1@example.com"},
{"name": "User 2", "email": "user2@example.com"},
{"name": "User 3", "email": "user3@example.com"}
]
response = client.data.create_many(
collection="users",
data=records
)
print(f"Successfully created {len(response.ids)} records")
# Batch update data
response = client.data.update_many(
collection="users",
filter={"status": "active"},
data={"last_login": "2024-01-01T00:00:00Z"}
)
print(f"Successfully updated {response.updated_count} records")
# Stream processing large amounts of data
stream = client.data.stream(
collection="logs",
filter={"level": "error"},
batch_size=100
)
for batch in stream:
for record in batch:
# Process each record
process_record(record)
# Stream create data
def data_generator():
for i in range(1000):
yield {
"id": f"record_{i}",
"data": f"content_{i}",
"timestamp": "2024-01-01T00:00:00Z"
}
response = client.data.create_stream(
collection="bulk_data",
data_generator=data_generator()
)
print(f"Successfully created {response.total_created} records")
from omniedge.exceptions import APIError, NotFoundError
try:
# Try to operate on data
response = client.data.create(
collection="users",
data={"email": "invalid-email"} # This will trigger validation error
)
except APIError as e:
print(f"API Error: {e.message}")
if e.status_code == 400:
print("Data validation failed")
elif e.status_code == 409:
print("Data already exists")
except NotFoundError:
print("Collection does not exist")
# Use transactions to ensure data consistency
with client.transaction() as tx:
try:
# Create user
user = tx.data.create("users", {"name": "John"})
# Create user config
tx.data.create("user_configs", {
"user_id": user.id,
"theme": "dark"
})
# Commit transaction
tx.commit()
except Exception as e:
# Rollback transaction
tx.rollback()
print(f"Transaction failed: {e}")
# Create index
client.data.create_index(
collection="users",
fields=["email"],
unique=True
)
# View indexes
indexes = client.data.list_indexes(collection="users")
for index in indexes:
print(f"Index: {index.name}")
# Delete index
client.data.drop_index(
collection="users",
index_name="email_1"
)
# Enable detailed logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Monitor operation performance
import time
start_time = time.time()
response = client.data.list("users", limit=1000)
end_time = time.time()
print(f"Query time: {end_time - start_time:.2f} seconds")
print(f"Returned record count: {len(response.data)}")
  1. Use appropriate batch sizes

    • Batch operations can improve performance, but overly large batches may cause memory issues
    • Recommended batch size between 100-1000
  2. Use indexes judiciously

    • Create indexes for frequently queried fields
    • Avoid over-indexing which affects write performance
  3. Error handling and retry

    • Implement exponential backoff retry mechanism
    • Distinguish between temporary and permanent errors
  4. Data validation

    • Perform basic data validation on client side
    • Utilize server-side validation rules
  5. Monitoring and performance optimization

    • Record performance metrics for key operations
    • Regularly check slow queries and resource usage

Here is a complete usage example:

# Synchronous Example
from omniedge import OmniEdge
with OmniEdge(
api_key="<YOUR_BEARER_TOKEN_HERE>",
) as omni_edge:
res = omni_edge.chat.create(model="openai/gpt-4o", messages=[], route="fallback", temperature=1, top_p=1, n=1, presence_penalty=0, frequency_penalty=0, repetition_penalty=1, stream=False)
with res as event_stream:
for event in event_stream:
# handle event
print(event, flush=True)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
from omniedge import OmniEdge
async def main():
async with OmniEdge(
api_key="<YOUR_BEARER_TOKEN_HERE>",
) as omni_edge:
res = await omni_edge.chat.create_async(model="openai/gpt-4o", messages=[], route="fallback", temperature=1, top_p=1, n=1, presence_penalty=0, frequency_penalty=0, repetition_penalty=1, stream=False)
async with res as event_stream:
async for event in event_stream:
# handle event
print(event, flush=True)
asyncio.run(main())