Skip to content

File Management

This chapter introduces how to use SDK for file management operations, including file upload, download, deletion, and metadata management.

  • ✅ File upload (single and batch)
  • ✅ File download and streaming
  • ✅ File metadata management
  • ✅ File classification and tagging
  • ✅ Large file chunked upload
  • ✅ File compression and decompression
  • ✅ File permission control
from omniedge import OmniEdge
client = OmniEdge(api_key="sk-your-api-key-here")
# Upload single file
with open("document.pdf", "rb") as file:
response = client.files.upload(
file=file,
filename="document.pdf",
content_type="application/pdf",
metadata={
"category": "documents",
"project": "alpha"
}
)
print(f"File ID: {response.file_id}")
print(f"File URL: {response.url}")
import os
# Batch upload files
file_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
uploaded_files = []
for file_path in file_paths:
with open(file_path, "rb") as file:
response = client.files.upload(
file=file,
filename=os.path.basename(file_path),
content_type="image/jpeg"
)
uploaded_files.append(response)
print(f"Successfully uploaded {len(uploaded_files)} files")
# Download file
file_id = "file-id-here"
download_path = "/path/to/download/document.pdf"
response = client.files.download(
file_id=file_id,
path=download_path
)
print(f"File downloaded to: {response.path}")
print(f"File size: {response.size} bytes")
# Get file metadata
file_info = client.files.get_info(file_id="file-id-here")
print(f"Filename: {file_info.filename}")
print(f"File size: {file_info.size} bytes")
print(f"Content type: {file_info.content_type}")
print(f"Created at: {file_info.created_at}")
print(f"Metadata: {file_info.metadata}")
# Delete single file
response = client.files.delete(file_id="file-id-here")
print(f"Delete status: {response.deleted}")
# Batch delete files
file_ids = ["file-id-1", "file-id-2", "file-id-3"]
response = client.files.delete_many(file_ids=file_ids)
print(f"Successfully deleted {response.deleted_count} files")
# Large file chunked upload
large_file_path = "/path/to/large/video.mp4"
# Initialize chunked upload
upload_session = client.files.initiate_multipart_upload(
filename="video.mp4",
content_type="video/mp4",
metadata={"category": "videos"}
)
# Chunked upload
chunk_size = 5 * 1024 * 1024 # 5MB per chunk
parts = []
with open(large_file_path, "rb") as file:
part_number = 1
while True:
chunk = file.read(chunk_size)
if not chunk:
break
# Upload chunk
part_response = client.files.upload_part(
upload_id=upload_session.upload_id,
part_number=part_number,
data=chunk
)
parts.append({
"part_number": part_number,
"etag": part_response.etag
})
part_number += 1
print(f"Uploaded chunk {part_number - 1}")
# Complete chunked upload
response = client.files.complete_multipart_upload(
upload_id=upload_session.upload_id,
parts=parts
)
print(f"File upload completed, ID: {response.file_id}")
# Stream upload
def file_generator(file_path, chunk_size=8192):
with open(file_path, "rb") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
stream_response = client.files.upload_stream(
filename="large_file.dat",
content_type="application/octet-stream",
data_stream=file_generator("/path/to/large_file.dat")
)
print(f"Stream upload completed, file ID: {stream_response.file_id}")
# Compress files
file_ids = ["file-id-1", "file-id-2", "file-id-3"]
archive_response = client.files.compress(
file_ids=file_ids,
format="zip",
filename="archive.zip"
)
print(f"Archive ID: {archive_response.file_id}")
# Extract files
extract_response = client.files.extract(
file_id=archive_response.file_id,
destination_path="/path/to/extract/"
)
print(f"Extraction completed, extracted {extract_response.extracted_count} files")
# Generate image thumbnails
thumbnail_response = client.files.generate_thumbnail(
file_id="image-file-id",
width=200,
height=200,
format="jpeg"
)
print(f"Thumbnail URL: {thumbnail_response.url}")
# Get file preview URL
preview_url = client.files.get_preview_url(
file_id="document-file-id",
width=800,
height=600
)
print(f"Preview URL: {preview_url}")
# List all files
files = client.files.list()
for file in files:
print(f"File: {file.filename} ({file.size} bytes)")
# Filter files by conditions
filtered_files = client.files.list(
filter={
"metadata.category": "images",
"content_type": {"$regex": "^image/"}
},
sort="created_at",
order="desc"
)
# Search files
search_results = client.files.search(
query="project alpha",
filters={
"metadata.project": "alpha"
},
limit=10
)
for result in search_results:
print(f"Matching file: {result.filename}")
# Add tags
client.files.add_tags(
file_id="file-id-here",
tags=["important", "review"]
)
# Remove tags
client.files.remove_tags(
file_id="file-id-here",
tags=["review"]
)
# Get all tags for a file
tags = client.files.get_tags(file_id="file-id-here")
print(f"File tags: {tags}")
# Set file permissions
client.files.set_permissions(
file_id="file-id-here",
permissions={
"read": ["user1", "user2"],
"write": ["user1"],
"delete": ["admin"]
}
)
# Get file permissions
permissions = client.files.get_permissions(file_id="file-id-here")
print(f"File permissions: {permissions}")
from omniedge.exceptions import APIError, NotFoundError, PermissionError
try:
# Try to upload file
with open("large_file.bin", "rb") as file:
response = client.files.upload(
file=file,
filename="large_file.bin",
content_type="application/octet-stream"
)
except NotFoundError:
print("Upload directory does not exist")
except PermissionError:
print("No upload permission")
except APIError as e:
if e.status_code == 413: # Payload Too Large
print("File too large, recommend using chunked upload")
elif e.status_code == 422: # Unprocessable Entity
print("File type not supported")
else:
print(f"Upload failed: {e.message}")
  1. File Naming Conventions

    • Use meaningful file names
    • Avoid special characters and spaces
    • Use consistent naming conventions
  2. Metadata Management

    • Add useful metadata to files
    • Use consistent metadata structure
    • Utilize metadata for file classification
  3. Large File Handling

    • Use chunked upload for large files
    • Implement resume functionality
    • Monitor upload progress
  4. Security Considerations

    • Validate file types and sizes
    • Implement file content scanning
    • Set appropriate file permissions
  5. Performance Optimization

    • Use concurrent upload for multiple files
    • Set appropriate chunk sizes
    • Utilize CDN to accelerate file access
import logging
# Enable detailed logging
logging.basicConfig(level=logging.DEBUG)
# Monitor file operation performance
import time
start_time = time.time()
response = client.files.upload(
file=open("large_file.zip", "rb"),
filename="large_file.zip"
)
end_time = time.time()
print(f"Upload time: {end_time - start_time:.2f} seconds")
print(f"File size: {response.size} bytes")
print(f"Upload speed: {response.size / (end_time - start_time) / 1024 / 1024:.2f} MB/s")

Here is a complete file management example:

from omniedge import OmniEdge
import os
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
def main():
# Initialize client
client = OmniEdge(api_key="sk-your-api-key-here")
try:
# 1. Upload file
print("Uploading file...")
with open("example.pdf", "rb") as file:
upload_response = client.files.upload(
file=file,
filename="example.pdf",
content_type="application/pdf",
metadata={
"category": "documents",
"project": "demo"
}
)
file_id = upload_response.file_id
print(f"File uploaded successfully, ID: {file_id}")
# 2. Get file information
file_info = client.files.get_info(file_id=file_id)
print(f"File info: {file_info.filename} ({file_info.size} bytes)")
# 3. Add tags
client.files.add_tags(file_id=file_id, tags=["demo", "important"])
print("Tags added")
# 4. Generate thumbnail (if image)
if file_info.content_type.startswith("image/"):
thumbnail = client.files.generate_thumbnail(
file_id=file_id,
width=200,
height=200
)
print(f"Thumbnail URL: {thumbnail.url}")
# 5. Download file
download_path = "/tmp/downloaded_example.pdf"
client.files.download(file_id=file_id, path=download_path)
print(f"File downloaded to: {download_path}")
# 6. List files
files = client.files.list(limit=10)
print(f"Recent {len(files)} files:")
for file in files:
print(f" - {file.filename} ({file.size} bytes)")
except Exception as e:
print(f"Operation failed: {e}")
if __name__ == "__main__":
main()