Data Management
Overview
Section titled “Overview”This chapter introduces how to use SDK for data management operations, including creating, querying, updating, and deleting data.
Features
Section titled “Features”- ✅ Data creation and storage
- ✅ Data query and filtering
- ✅ Data update and deletion
- ✅ Batch operation support
- ✅ Data streaming processing
Basic Operations
Section titled “Basic Operations”1. Initialize Client
Section titled “1. Initialize Client”from omniedge import OmniEdge
client = OmniEdge(api_key="sk-your-api-key-here")2. Create Data
Section titled “2. Create Data”# Create single data recordresponse = client.data.create( collection="users", data={ "name": "John Doe", "email": "john@example.com", "age": 30 })
print(f"Created data ID: {response.id}")3. Query Data
Section titled “3. Query Data”# Query all dataall_data = client.data.list(collection="users")
# Query with conditionsfiltered_data = client.data.search( collection="users", filter={ "age": {"$gte": 18} })
# Paginated querypaged_data = client.data.list( collection="users", limit=10, offset=20)4. Update Data
Section titled “4. Update Data”# Update single recordresponse = client.data.update( collection="users", id="user-id-here", data={ "email": "newemail@example.com", "updated_at": "2024-01-01T00:00:00Z" })
print(f"Update status: {response.status}")5. Delete Data
Section titled “5. Delete Data”# Delete single recordresponse = client.data.delete( collection="users", id="user-id-here")
print(f"Delete status: {response.deleted}")
# Batch deleteresponse = client.data.delete_many( collection="users", filter={ "status": "inactive" })
print(f"Deleted record count: {response.deleted_count}")Batch Operations
Section titled “Batch Operations”Batch Create
Section titled “Batch Create”# Batch create datarecords = [ {"name": "User 1", "email": "user1@example.com"}, {"name": "User 2", "email": "user2@example.com"}, {"name": "User 3", "email": "user3@example.com"}]
response = client.data.create_many( collection="users", data=records)
print(f"Successfully created {len(response.ids)} records")Batch Update
Section titled “Batch Update”# Batch update dataresponse = client.data.update_many( collection="users", filter={"status": "active"}, data={"last_login": "2024-01-01T00:00:00Z"})
print(f"Successfully updated {response.updated_count} records")Stream Processing
Section titled “Stream Processing”Stream Query
Section titled “Stream Query”# Stream processing large amounts of datastream = client.data.stream( collection="logs", filter={"level": "error"}, batch_size=100)
for batch in stream: for record in batch: # Process each record process_record(record)Stream Create
Section titled “Stream Create”# Stream create datadef data_generator(): for i in range(1000): yield { "id": f"record_{i}", "data": f"content_{i}", "timestamp": "2024-01-01T00:00:00Z" }
response = client.data.create_stream( collection="bulk_data", data_generator=data_generator())
print(f"Successfully created {response.total_created} records")Error Handling
Section titled “Error Handling”from omniedge.exceptions import APIError, NotFoundError
try: # Try to operate on data response = client.data.create( collection="users", data={"email": "invalid-email"} # This will trigger validation error )
except APIError as e: print(f"API Error: {e.message}") if e.status_code == 400: print("Data validation failed") elif e.status_code == 409: print("Data already exists")
except NotFoundError: print("Collection does not exist")Advanced Features
Section titled “Advanced Features”Transaction Support
Section titled “Transaction Support”# Use transactions to ensure data consistencywith client.transaction() as tx: try: # Create user user = tx.data.create("users", {"name": "John"})
# Create user config tx.data.create("user_configs", { "user_id": user.id, "theme": "dark" })
# Commit transaction tx.commit()
except Exception as e: # Rollback transaction tx.rollback() print(f"Transaction failed: {e}")Index Management
Section titled “Index Management”# Create indexclient.data.create_index( collection="users", fields=["email"], unique=True)
# View indexesindexes = client.data.list_indexes(collection="users")for index in indexes: print(f"Index: {index.name}")
# Delete indexclient.data.drop_index( collection="users", index_name="email_1")Monitoring and Logging
Section titled “Monitoring and Logging”# Enable detailed loggingimport logginglogging.basicConfig(level=logging.DEBUG)
# Monitor operation performanceimport time
start_time = time.time()response = client.data.list("users", limit=1000)end_time = time.time()
print(f"Query time: {end_time - start_time:.2f} seconds")print(f"Returned record count: {len(response.data)}")Best Practices
Section titled “Best Practices”-
Use appropriate batch sizes
- Batch operations can improve performance, but overly large batches may cause memory issues
- Recommended batch size between 100-1000
-
Use indexes judiciously
- Create indexes for frequently queried fields
- Avoid over-indexing which affects write performance
-
Error handling and retry
- Implement exponential backoff retry mechanism
- Distinguish between temporary and permanent errors
-
Data validation
- Perform basic data validation on client side
- Utilize server-side validation rules
-
Monitoring and performance optimization
- Record performance metrics for key operations
- Regularly check slow queries and resource usage
Example Code
Section titled “Example Code”Here is a complete usage example:
# Synchronous Examplefrom omniedge import OmniEdge
with OmniEdge( api_key="<YOUR_BEARER_TOKEN_HERE>",) as omni_edge:
res = omni_edge.chat.create(model="openai/gpt-4o", messages=[], route="fallback", temperature=1, top_p=1, n=1, presence_penalty=0, frequency_penalty=0, repetition_penalty=1, stream=False)
with res as event_stream: for event in event_stream: # handle event print(event, flush=True)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Exampleimport asynciofrom omniedge import OmniEdge
async def main():
async with OmniEdge( api_key="<YOUR_BEARER_TOKEN_HERE>", ) as omni_edge:
res = await omni_edge.chat.create_async(model="openai/gpt-4o", messages=[], route="fallback", temperature=1, top_p=1, n=1, presence_penalty=0, frequency_penalty=0, repetition_penalty=1, stream=False)
async with res as event_stream: async for event in event_stream: # handle event print(event, flush=True)
asyncio.run(main())