How to: Parallel Operations with Labelbox SDK

In this post, we will demonstrate various integrations utilizing parallel methods to speed up a couple of common Labelbox functionalities. Please note these scripts utilize Labelbox SDK version 59.0 and could be outdated in later releases. I will try to update this post as new features are released, but this is not a guarantee.

Please feel free to modify these scripts to your needs, but use them at your own risk.

Upload Data Rows to Labelbox

This method uses multithreading to increase the speed of getting data rows on the platform. With this function, you will need to have a list of created data row dictionaries. Please reference these documents for more information. I recommend not going past more than four threads to avoid overwhelming the platform and receiving errors.

import labelbox as lb
from concurrent.futures import ThreadPoolExecutor

def upload_data_rows_threading(
    assets: list[dict[str:str]], 
    dataset: lb.Dataset, 
    batch_size: int,
    workers: int = 4) -> list[lb.Task]:
    """Upload data rows to Labelbox in parallel

    args:
        assets(list[dict[str:str]]): List of data row dictionaries
        dataset(lb.Dataset): Labelbox dataset
        batch_size(int): Amount of data rows per parallel operation
        workers(int): Amount of threads used
    returns:
        tasks(list[lb.Task]): List of Labelbox task objects
    """

    # Divide data rows in different batches
    payload = [assets[i:i+batch_size] for i in range(0, len(assets), batch_size)]

    # Callback function used with ThreadPoolExecutor
    def threading_callback(assets: list[dict[str:str]]):
        try:
            task = dataset.create_data_rows(assets)
            task.wait_till_done(1800, 6)

            if task.errors:
                print(task.errors)
            
            return task
        except:
            return task

    with ThreadPoolExecutor(max_workers=workers) as executer:
        results:list[lb.Task] = list(executer.map(threading_callback, payload))
    
    return results

When utilizing this script, you can view your upload progress inside the UI by navigating to the notification center. Each thread is represented by a task, these task are returned as a list of task objects that you could iterator through to get more information. For the task.wait_till_done() method, adjust these parameters if you hit your API limit.

Download Segment Mask Files from Labelbox URL

Export Mask URLs

First, we need to get our mask URLs from our project. This script is a general demonstration of receiving the mask URLs by exporting your data rows from a project and appending them to a list. You could need to modify this script depending on how your labels are set up. Please reference these documents for an overview of masks.

import labelbox as lb

client = lb.Client("<API Key>")

project = client.get_project("<Project Id>")

export = project.export_v2()
export.wait_till_done()
export_json = export.result

mask_urls = []

for data_row in export_json:
    labels = data_row["projects"][project.uid]["labels"]
    for label in labels:
        objects = label["annotations"]["objects"]
        for obj in objects:
            if "mask" in obj.keys():
                mask_urls.append(
                    obj["mask"]["url"]
                )

Download Mask Images

Once we have our mask URLs in a list, we can pass them to our parallel operation. In this script, we are using multiprocessing since you will likely modify the images, which could be processing intensive.

The processing callback has to be located in a separate Python file and imported into our processing operation.

Processing Callback

from PIL import Image
from uuid import uuid4
import labelbox as lb
import urllib.request

client = lb.Client("<API Key>")

def processing_callback(mask_url: str):
    # Receive image and open with pillow to save image
    req = urllib.request.Request(mask_url, headers=client.headers)
    image = Image.open(urllib.request.urlopen(req))
    image.save(f"./image-{str(uuid4())}.jpeg")

Processing Operation

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
from processing import processing_callback

# Get CPU core count subtract by two
num_workers = multiprocessing.cpu_count() - 2
 
with ProcessPoolExecutor(num_workers) as pool:
    pool.map(processing_callback, mask_urls)
2 Likes