class deeplake.core.transform.Pipeline(functions: List[ComputeFunction])
eval(data_in, ds_out: Optional[Dataset] = None, num_workers: int = 0, scheduler: str = 'threaded', progressbar: bool = True, skip_ok: bool = False, check_lengths: bool = True, pad_data_in: bool = False, read_only_ok: bool = False, cache_size: int = 16, checkpoint_interval: int = 0, ignore_errors: bool = False, verbose: bool = True, **kwargs)

Evaluates the pipeline on data_in to produce an output dataset ds_out.

  • data_in – Input passed to the transform to generate output dataset. Should support __getitem__ and __len__. Can be a Deep Lake dataset.

  • ds_out (Dataset, optional) –

    • The dataset object to which the transform will get written. If this is not provided, data_in will be overwritten if it is a Deep Lake dataset, otherwise error will be raised.

    • It should have all keys being generated in output already present as tensors. It’s initial state should be either:

    • Empty, i.e., all tensors have no samples. In this case all samples are added to the dataset.

    • All tensors are populated and have same length. In this case new samples are appended to the dataset.

  • num_workers (int) – The number of workers to use for performing the transform. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.

  • scheduler (str) – The scheduler to be used to compute the transformation. Supported values include: ‘serial’, ‘threaded’, ‘processed’ and ‘ray’. Defaults to ‘threaded’.

  • progressbar (bool) – Displays a progress bar if True (default).

  • skip_ok (bool) – If True, skips the check for output tensors generated. This allows the user to skip certain tensors in the function definition. This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.

  • check_lengths (bool) – If True, checks whether ds_out has tensors of same lengths initially.

  • pad_data_in (bool) – If True, pads tensors of data_in to match the length of the largest tensor in data_in. Defaults to False.

  • read_only_ok (bool) – If True and output dataset is same as input dataset, the read-only check is skipped. Defaults to False.

  • cache_size (int) – Cache size to be used by transform per worker.

  • checkpoint_interval (int) – If > 0, the transform will be checkpointed with a commit every checkpoint_interval input samples to avoid restarting full transform due to intermitten failures. If the transform is interrupted, the intermediate data is deleted and the dataset is reset to the last commit. If <= 0, no checkpointing is done. Checkpoint interval should be a multiple of num_workers if num_workers > 0. Defaults to 0.

  • ignore_errors (bool) – If True, input samples that causes transform to fail will be skipped and the errors will be ignored if possible.

  • verbose (bool) – If True, prints additional information about the transform.

  • **kwargs – Additional arguments.

  • InvalidInputDataError – If data_in passed to transform is invalid. It should support __getitem__ and __len__ operations. Using scheduler other than “threaded” with deeplake dataset having base storage as memory as data_in will also raise this.

  • InvalidOutputDatasetError – If all the tensors of ds_out passed to transform don’t have the same length. Using scheduler other than “threaded” with deeplake dataset having base storage as memory as ds_out will also raise this.

  • TensorMismatchError – If one or more of the outputs generated during transform contain different tensors than the ones present in ‘ds_out’ provided to transform.

  • UnsupportedSchedulerError – If the scheduler passed is not recognized. Supported values include: ‘serial’, ‘threaded’, ‘processed’ and ‘ray’.

  • TransformError – All other exceptions raised if there are problems while running the pipeline.

  • ValueError – If num_workers > 0 and checkpoint_interval is not a multiple of num_workers or if checkpoint_interval > 0 and ds_out is None.

# noqa: DAR401


def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
    samples_out.my_tensor.append(my_arg0 * my_arg1)

# This transform can be used using the eval method in one of these 2 ways:-

# Directly evaluating the method
# here arg0 and arg1 correspond to the 3rd and 4th argument in my_fn
my_fn(arg0, arg1).eval(data_in, ds_out, scheduler="threaded", num_workers=5)

# As a part of a Transform pipeline containing other functions
pipeline = deeplake.compose([my_fn(a, b), another_function(x=2)])
pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)


pad_data_in is only applicable if data_in is a Deep Lake dataset.