In this post, we will demonstrate various integrations utilizing parallel methods to speed up a couple of common Labelbox functionalities. Please note these scripts utilize Labelbox SDK version 59.0 and could be outdated in later releases. I will try to update this post as new features are released, but this is not a guarantee.
Please feel free to modify these scripts to your needs, but use them at your own risk.
Upload Data Rows to Labelbox
This method uses multithreading to increase the speed of getting data rows on the platform. With this function, you will need to have a list of created data row dictionaries. Please reference these documents for more information. I recommend not going past more than four threads to avoid overwhelming the platform and receiving errors.
import labelbox as lb
from concurrent.futures import ThreadPoolExecutor
def upload_data_rows_threading(
assets: list[dict[str:str]],
dataset: lb.Dataset,
batch_size: int,
workers: int = 4) -> list[lb.Task]:
"""Upload data rows to Labelbox in parallel
args:
assets(list[dict[str:str]]): List of data row dictionaries
dataset(lb.Dataset): Labelbox dataset
batch_size(int): Amount of data rows per parallel operation
workers(int): Amount of threads used
returns:
tasks(list[lb.Task]): List of Labelbox task objects
"""
# Divide data rows in different batches
payload = [assets[i:i+batch_size] for i in range(0, len(assets), batch_size)]
# Callback function used with ThreadPoolExecutor
def threading_callback(assets: list[dict[str:str]]):
try:
task = dataset.create_data_rows(assets)
task.wait_till_done(1800, 6)
if task.errors:
print(task.errors)
return task
except:
return task
with ThreadPoolExecutor(max_workers=workers) as executer:
results:list[lb.Task] = list(executer.map(threading_callback, payload))
return results
When utilizing this script, you can view your upload progress inside the UI by navigating to the notification center. Each thread is represented by a task, these task are returned as a list of task objects that you could iterator through to get more information. For the task.wait_till_done()
method, adjust these parameters if you hit your API limit.
Download Segment Mask Files from Labelbox URL
Export Mask URLs
First, we need to get our mask URLs from our project. This script is a general demonstration of receiving the mask URLs by exporting your data rows from a project and appending them to a list. You could need to modify this script depending on how your labels are set up. Please reference these documents for an overview of masks.
import labelbox as lb
client = lb.Client("<API Key>")
project = client.get_project("<Project Id>")
export = project.export_v2()
export.wait_till_done()
export_json = export.result
mask_urls = []
for data_row in export_json:
labels = data_row["projects"][project.uid]["labels"]
for label in labels:
objects = label["annotations"]["objects"]
for obj in objects:
if "mask" in obj.keys():
mask_urls.append(
obj["mask"]["url"]
)
Download Mask Images
Once we have our mask URLs in a list, we can pass them to our parallel operation. In this script, we are using multiprocessing since you will likely modify the images, which could be processing intensive.
The processing callback has to be located in a separate Python file and imported into our processing operation.
Processing Callback
from PIL import Image
from uuid import uuid4
import labelbox as lb
import urllib.request
client = lb.Client("<API Key>")
def processing_callback(mask_url: str):
# Receive image and open with pillow to save image
req = urllib.request.Request(mask_url, headers=client.headers)
image = Image.open(urllib.request.urlopen(req))
image.save(f"./image-{str(uuid4())}.jpeg")
Processing Operation
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from processing import processing_callback
# Get CPU core count subtract by two
num_workers = multiprocessing.cpu_count() - 2
with ProcessPoolExecutor(num_workers) as pool:
pool.map(processing_callback, mask_urls)