While Labelbox supports most cloud providers, we can also support alternatives like Nebius (a platform that provides storage and training for models).
In this guide we will import data from Nebius to Labelbox and go around some limitations on the token expiry (max 7 days).
Requirement:
-
A Labelbox account (API KEY)
-
A Nebius account with a service account (read data)
-
boto3 package installed
We will assume you want to import all your data and that there is no duplication in names (using global_key).
First letβs set up a Nebius Service Account:
IAM β Service Account β Create entity
Once created you can use a custom group or allocate the same level of permission of the pre-existing editors.
Create a key pair and save it securely.
Before uploading, make sure to set a CORS policy so data can be accessed in Labelbox
import boto3
import os
s3_client = boto3.client(
service_name='s3',
endpoint_url='https://storage.eu-west1.nebius.cloud',
aws_access_key_id='<SERVICE_ACCOUNT_KEY_ID>',
aws_secret_access_key='<SERVICE_ACCOUNT_ACCESS_KEY>'
)
bucket_name = 'white-stingray-bucket-6'
# CORS configuration for Labelbox
cors_configuration = {
'CORSRules': [{
'AllowedHeaders': ['*'],
'AllowedMethods': ['GET', 'HEAD'],
'AllowedOrigins': [
'https://app.labelbox.com',
'https://editor.labelbox.com'
],
'ExposeHeaders': [],
}]
}
try:
s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration=cors_configuration
)
print(f"β CORS configured for bucket: {bucket_name}")
# Verify CORS settings
response = s3_client.get_bucket_cors(Bucket=bucket_name)
print(f"Current CORS rules: {response['CORSRules']}")
except Exception as e:
print(f"Error configuring CORS: {e}")
Once done lets get your data to Labelbox:
import boto3
import labelbox as lb
# Connect to Nebius
s3_client = boto3.client(
service_name='s3',
endpoint_url='https://storage.eu-west1.nebius.cloud',
aws_access_key_id='<SERVICE_ACCOUNT_KEY_ID>',
aws_secret_access_key='<SERVICE_ACCOUNT_ACCESS_KEY>'
)
# List all audio files in your Nebius bucket
bucket_name = '<YOUR_BUCKET_NAME>'
response = s3_client.list_objects_v2(Bucket=bucket_name)
files = [obj['Key'] for obj in response.get('Contents', [])]
print(f"Found {len(files)} files in Nebius")
# Generate data rows with Nebius URLs
data_rows = []
for file_key in files:
# Skip only folder markers (entries ending with /)
if file_key.endswith('/'):
print(f"Skipping folder marker: {file_key}")
continue
# Generate signed URL - spaces in file_key are handled automatically
url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': file_key},
ExpiresIn=604800 # 7 days in seconds
)
# Get the filename (last part of the path) and remove extension
filename = file_key.split('/')[-1]
global_key = filename.rsplit('.', 1)[0] if '.' in filename else filename
data_rows.append({
"row_data": url,
"global_key": global_key
})
print(f"Processing {len(data_rows)} actual files")
# Import to Labelbox
lb_client = lb.Client(api_key='')
dataset = lb_client.create_dataset(name="<DATASET_NAME>", iam_integration=None)
task = dataset.create_data_rows(data_rows)
task.wait_till_done()
print(f"mported {len(data_rows)} files to Labelbox")
print(f"Dataset ID: {dataset.uid}")
Now 7 days is a limitation for longer-running projects you may have, we can run a cron job to update the token.
Here is a script that can do that.
def regenerate_urls(dataset_id, bucket_name):
"""
Regenerate URLs for all data rows in a dataset
Works without metadata by reconstructing S3 paths from external_ids
Args:
dataset_id: Labelbox dataset ID
bucket_name: Your Nebius bucket name
"""
import boto3
import labelbox as lb
import os
from urllib.parse import urlparse, unquote
# Initialize clients
s3_client = boto3.client(
service_name='s3',
endpoint_url='https://storage.eu-west1.nebius.cloud',
aws_access_key_id='<SERVICE_ACCOUNT_KEY_ID>',
aws_secret_access_key='<SERVICE_ACCOUNT_ACCESS_KEY>'
)
lb_client = lb.Client(api_key='<LABELBOX_API_KEY>')
dataset = lb_client.get_dataset(dataset_id)
print(f"Regenerating URLs for dataset: {dataset.name}")
data_rows = list(dataset.data_rows())
print(f"Found {len(data_rows)} data rows")
updated_count = 0
failed_count = 0
for i, data_row in enumerate(data_rows, 1):
try:
# Method 1: Try to parse existing URL to extract S3 key
current_url = data_row.row_data
s3_key = None
if current_url and 'storage' in current_url:
parsed = urlparse(current_url)
path_parts = parsed.path.strip('/').split('/', 1)
if len(path_parts) >= 2:
# path_parts[0] is bucket, path_parts[1] is the key
s3_key = unquote(path_parts[1])
# Method 2: Reconstruct from external_id if parsing failed
if not s3_key:
s3_key = f"{prefix}{data_row.external_id}{file_extension}"
# Generate new signed URL (7 days)
new_url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': s3_key},
ExpiresIn=604800
)
# Update data row
data_row.update(row_data=new_url)
updated_count += 1
except Exception as e:
failed_count += 1
print(f"[{i}/{len(data_rows)}] Failed {data_row.external_id}: {e}")
print(f"\n{'='*60}")
print(f"Successfully updated: {updated_count}/{len(data_rows)}")
if failed_count > 0:
print(f"Failed: {failed_count}/{len(data_rows)}")
print(f"{'='*60}")
# Usage example
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python refresh_urls.py <dataset_id>")
sys.exit(1)
dataset_id = '<DATASET_ID>'
# Configuration
BUCKET_NAME = 'white-stingray-bucket-6'
regenerate_urls(
dataset_id=dataset_id,
bucket_name=BUCKET_NAME,
)

