Skip to content

Catalog

Catalog

Catalog(
    id_generator: AbstractIDGenerator,
    metastore: AbstractMetastore,
    warehouse: AbstractWarehouse,
    cache_dir=None,
    tmp_dir=None,
    client_config: Optional[dict[str, Any]] = None,
    warehouse_ready_callback: Optional[
        Callable[[AbstractWarehouse], None]
    ] = None,
)
Source code in datachain/catalog/catalog.py
def __init__(
    self,
    id_generator: "AbstractIDGenerator",
    metastore: "AbstractMetastore",
    warehouse: "AbstractWarehouse",
    cache_dir=None,
    tmp_dir=None,
    client_config: Optional[dict[str, Any]] = None,
    warehouse_ready_callback: Optional[
        Callable[["AbstractWarehouse"], None]
    ] = None,
):
    datachain_dir = DataChainDir(cache=cache_dir, tmp=tmp_dir)
    datachain_dir.init()
    self.id_generator = id_generator
    self.metastore = metastore
    self._warehouse = warehouse
    self.cache = DataChainCache(datachain_dir.cache, datachain_dir.tmp)
    self.client_config = client_config if client_config is not None else {}
    self._init_params = {
        "cache_dir": cache_dir,
        "tmp_dir": tmp_dir,
    }
    self._warehouse_ready_callback = warehouse_ready_callback

cleanup_temp_tables

cleanup_temp_tables(names: Iterable[str]) -> None

Drop tables created temporarily when processing datasets.

This should be implemented even if temporary tables are used to ensure that they are cleaned up as soon as they are no longer needed. When running the same DatasetQuery multiple times we may use the same temporary table names.

Source code in datachain/catalog/catalog.py
def cleanup_temp_tables(self, names: Iterable[str]) -> None:
    """
    Drop tables created temporarily when processing datasets.

    This should be implemented even if temporary tables are used to
    ensure that they are cleaned up as soon as they are no longer
    needed. When running the same `DatasetQuery` multiple times we
    may use the same temporary table names.
    """
    self.warehouse.cleanup_temp_tables(names)
    self.id_generator.delete_uris(names)

clone

clone(
    sources: list[str],
    output: str,
    force: bool = False,
    update: bool = False,
    recursive: bool = False,
    no_glob: bool = False,
    no_cp: bool = False,
    edatachain: bool = False,
    edatachain_file: Optional[str] = None,
    ttl: int = TTL_INT,
    *,
    client_config=None
) -> None

This command takes cloud path(s) and duplicates files and folders in them into the dataset folder. It also adds those files to a dataset in database, which is created if doesn't exist yet Optionally, it creates a .edatachain file

Source code in datachain/catalog/catalog.py
def clone(
    self,
    sources: list[str],
    output: str,
    force: bool = False,
    update: bool = False,
    recursive: bool = False,
    no_glob: bool = False,
    no_cp: bool = False,
    edatachain: bool = False,
    edatachain_file: Optional[str] = None,
    ttl: int = TTL_INT,
    *,
    client_config=None,
) -> None:
    """
    This command takes cloud path(s) and duplicates files and folders in
    them into the dataset folder.
    It also adds those files to a dataset in database, which is
    created if doesn't exist yet
    Optionally, it creates a .edatachain file
    """
    if not no_cp or edatachain:
        self.cp(
            sources,
            output,
            force=force,
            update=update,
            recursive=recursive,
            no_glob=no_glob,
            edatachain_only=no_cp,
            no_edatachain_file=not edatachain,
            edatachain_file=edatachain_file,
            ttl=ttl,
            client_config=client_config,
        )
    else:
        # since we don't call cp command, which does listing implicitly,
        # it needs to be done here
        self.enlist_sources(
            sources,
            ttl,
            update,
            client_config=client_config or self.client_config,
        )

    self.create_dataset_from_sources(
        output, sources, client_config=client_config, recursive=recursive
    )

cp

cp(
    sources: list[str],
    output: str,
    force: bool = False,
    update: bool = False,
    recursive: bool = False,
    edatachain_file: Optional[str] = None,
    edatachain_only: bool = False,
    no_edatachain_file: bool = False,
    no_glob: bool = False,
    ttl: int = TTL_INT,
    *,
    client_config=None
) -> list[dict[str, Any]]

This function copies files from cloud sources to local destination directory If cloud source is not indexed, or has expired index, it runs indexing It also creates .edatachain file by default, if not specified differently

Source code in datachain/catalog/catalog.py
def cp(
    self,
    sources: list[str],
    output: str,
    force: bool = False,
    update: bool = False,
    recursive: bool = False,
    edatachain_file: Optional[str] = None,
    edatachain_only: bool = False,
    no_edatachain_file: bool = False,
    no_glob: bool = False,
    ttl: int = TTL_INT,
    *,
    client_config=None,
) -> list[dict[str, Any]]:
    """
    This function copies files from cloud sources to local destination directory
    If cloud source is not indexed, or has expired index, it runs indexing
    It also creates .edatachain file by default, if not specified differently
    """
    client_config = client_config or self.client_config
    node_groups = self.enlist_sources_grouped(
        sources,
        ttl,
        update,
        no_glob,
        client_config=client_config,
    )

    always_copy_dir_contents, copy_to_filename = prepare_output_for_cp(
        node_groups, output, force, edatachain_only, no_edatachain_file
    )
    dataset_file = check_output_dataset_file(
        output, force, edatachain_file, no_edatachain_file
    )

    total_size, total_files = collect_nodes_for_cp(node_groups, recursive)

    if total_files == 0:
        # Nothing selected to cp
        return []

    desc_max_len = max(len(output) + 16, 19)
    bar_format = (
        "{desc:<"
        f"{desc_max_len}"
        "}{percentage:3.0f}%|{bar}| {n_fmt:>5}/{total_fmt:<5} "
        "[{elapsed}<{remaining}, {rate_fmt:>8}]"
    )

    if not edatachain_only:
        with get_download_bar(bar_format, total_size) as pbar:
            for node_group in node_groups:
                node_group.download(recursive=recursive, pbar=pbar)

    instantiate_node_groups(
        node_groups,
        output,
        bar_format,
        total_files,
        force,
        recursive,
        edatachain_only,
        always_copy_dir_contents,
        copy_to_filename,
    )
    if no_edatachain_file:
        return []

    metafile_data = compute_metafile_data(node_groups)
    if metafile_data:
        # Don't write the metafile if nothing was copied
        print(f"Creating '{dataset_file}'")
        with open(dataset_file, "w", encoding="utf-8") as fd:
            yaml.dump(metafile_data, fd, sort_keys=False)

    return metafile_data

create_dataset

create_dataset(
    name: str,
    version: Optional[int] = None,
    *,
    columns: Sequence[Column],
    feature_schema: Optional[dict] = None,
    query_script: str = "",
    create_rows: Optional[bool] = True,
    validate_version: Optional[bool] = True,
    listing: Optional[bool] = False
) -> DatasetRecord

Creates new dataset of a specific version. If dataset is not yet created, it will create it with version 1 If version is None, then next unused version is created. If version is given, then it must be an unused version number.

Source code in datachain/catalog/catalog.py
def create_dataset(
    self,
    name: str,
    version: Optional[int] = None,
    *,
    columns: Sequence[Column],
    feature_schema: Optional[dict] = None,
    query_script: str = "",
    create_rows: Optional[bool] = True,
    validate_version: Optional[bool] = True,
    listing: Optional[bool] = False,
) -> "DatasetRecord":
    """
    Creates new dataset of a specific version.
    If dataset is not yet created, it will create it with version 1
    If version is None, then next unused version is created.
    If version is given, then it must be an unused version number.
    """
    assert [c.name for c in columns if c.name != "id"], f"got {columns=}"
    if not listing and Client.is_data_source_uri(name):
        raise RuntimeError(
            "Cannot create dataset that starts with source prefix, e.g s3://"
        )
    default_version = 1
    try:
        dataset = self.get_dataset(name)
        default_version = dataset.next_version
    except DatasetNotFoundError:
        schema = {
            c.name: c.type.to_dict() for c in columns if isinstance(c.type, SQLType)
        }
        dataset = self.metastore.create_dataset(
            name,
            feature_schema=feature_schema,
            query_script=query_script,
            schema=schema,
            ignore_if_exists=True,
        )

    version = version or default_version

    if dataset.has_version(version):
        raise DatasetInvalidVersionError(
            f"Version {version} already exists in dataset {name}"
        )

    if validate_version and not dataset.is_valid_next_version(version):
        raise DatasetInvalidVersionError(
            f"Version {version} must be higher than the current latest one"
        )

    return self.create_new_dataset_version(
        dataset,
        version,
        feature_schema=feature_schema,
        query_script=query_script,
        create_rows_table=create_rows,
        columns=columns,
    )

create_new_dataset_version

create_new_dataset_version(
    dataset: DatasetRecord,
    version: int,
    *,
    columns: Sequence[Column],
    sources="",
    feature_schema=None,
    query_script="",
    error_message="",
    error_stack="",
    script_output="",
    create_rows_table=True,
    job_id: Optional[str] = None,
    is_job_result: bool = False
) -> DatasetRecord

Creates dataset version if it doesn't exist. If create_rows is False, dataset rows table will not be created

Source code in datachain/catalog/catalog.py
def create_new_dataset_version(
    self,
    dataset: DatasetRecord,
    version: int,
    *,
    columns: Sequence[Column],
    sources="",
    feature_schema=None,
    query_script="",
    error_message="",
    error_stack="",
    script_output="",
    create_rows_table=True,
    job_id: Optional[str] = None,
    is_job_result: bool = False,
) -> DatasetRecord:
    """
    Creates dataset version if it doesn't exist.
    If create_rows is False, dataset rows table will not be created
    """
    assert [c.name for c in columns if c.name != "id"], f"got {columns=}"
    schema = {
        c.name: c.type.to_dict() for c in columns if isinstance(c.type, SQLType)
    }
    dataset = self.metastore.create_dataset_version(
        dataset,
        version,
        status=DatasetStatus.PENDING,
        sources=sources,
        feature_schema=feature_schema,
        query_script=query_script,
        error_message=error_message,
        error_stack=error_stack,
        script_output=script_output,
        schema=schema,
        job_id=job_id,
        is_job_result=is_job_result,
        ignore_if_exists=True,
    )

    if create_rows_table:
        table_name = self.warehouse.dataset_table_name(dataset.name, version)
        self.warehouse.create_dataset_rows_table(table_name, columns=columns)
        self.update_dataset_version_with_warehouse_info(dataset, version)

    return dataset

dataset_stats

dataset_stats(name: str, version: int) -> DatasetStats

Returns tuple with dataset stats: total number of rows and total dataset size.

Source code in datachain/catalog/catalog.py
def dataset_stats(self, name: str, version: int) -> DatasetStats:
    """
    Returns tuple with dataset stats: total number of rows and total dataset size.
    """
    dataset = self.get_dataset(name)
    dataset_version = dataset.get_version(version)
    return DatasetStats(
        num_objects=dataset_version.num_objects,
        size=dataset_version.size,
    )

get_client

get_client(uri: StorageURI, **config: Any) -> Client

Return the client corresponding to the given source uri.

Source code in datachain/catalog/catalog.py
def get_client(self, uri: StorageURI, **config: Any) -> Client:
    """
    Return the client corresponding to the given source `uri`.
    """
    config = config or self.client_config
    cls = Client.get_implementation(uri)
    return cls.from_source(uri, self.cache, **config)

get_file_signals

get_file_signals(
    dataset_name: str, dataset_version: int, row: RowDict
) -> Optional[dict]

Function that returns file signals from dataset row. Note that signal names are without prefix, so if there was 'laion__file__source' in original row, result will have just 'source' Example output: { "source": "s3://ldb-public", "parent": "animals/dogs", "name": "dog.jpg", ... }

Source code in datachain/catalog/catalog.py
def get_file_signals(
    self, dataset_name: str, dataset_version: int, row: RowDict
) -> Optional[dict]:
    """
    Function that returns file signals from dataset row.
    Note that signal names are without prefix, so if there was 'laion__file__source'
    in original row, result will have just 'source'
    Example output:
        {
            "source": "s3://ldb-public",
            "parent": "animals/dogs",
            "name": "dog.jpg",
            ...
        }
    """
    from datachain.lib.signal_schema import DEFAULT_DELIMITER, SignalSchema

    version = self.get_dataset(dataset_name).get_version(dataset_version)

    file_signals_values = {}

    schema = SignalSchema.deserialize(version.feature_schema)
    for file_signals in schema.get_file_signals():
        prefix = file_signals.replace(".", DEFAULT_DELIMITER) + DEFAULT_DELIMITER
        file_signals_values[file_signals] = {
            c_name.removeprefix(prefix): c_value
            for c_name, c_value in row.items()
            if c_name.startswith(prefix)
            and DEFAULT_DELIMITER not in c_name.removeprefix(prefix)
        }

    if not file_signals_values:
        return None

    # there can be multiple file signals in a schema, but taking the first
    # one for now. In future we might add ability to choose from which one
    # to open object
    return next(iter(file_signals_values.values()))

merge_datasets

merge_datasets(
    src: DatasetRecord,
    dst: DatasetRecord,
    src_version: int,
    dst_version: Optional[int] = None,
) -> DatasetRecord

Merges records from source to destination dataset. It will create new version of a dataset with records merged from old version and the source, unless existing version is specified for destination in which case it must be in non final status as datasets are immutable

Source code in datachain/catalog/catalog.py
def merge_datasets(
    self,
    src: DatasetRecord,
    dst: DatasetRecord,
    src_version: int,
    dst_version: Optional[int] = None,
) -> DatasetRecord:
    """
    Merges records from source to destination dataset.
    It will create new version
    of a dataset with records merged from old version and the source, unless
    existing version is specified for destination in which case it must
    be in non final status as datasets are immutable
    """
    if (
        dst_version
        and not dst.is_valid_next_version(dst_version)
        and dst.get_version(dst_version).is_final_status()
    ):
        raise DatasetInvalidVersionError(
            f"Version {dst_version} must be higher than the current latest one"
        )

    src_dep = self.get_dataset_dependencies(src.name, src_version)
    dst_dep = self.get_dataset_dependencies(
        dst.name,
        dst.latest_version,  # type: ignore[arg-type]
    )

    if dst.has_version(dst_version):  # type: ignore[arg-type]
        # case where we don't create new version, but append to the existing one
        self.warehouse.merge_dataset_rows(
            src,
            dst,
            src_version,
            dst_version=dst_version,  # type: ignore[arg-type]
        )
        merged_schema = src.serialized_schema | dst.serialized_schema
        self.update_dataset(dst, schema=merged_schema)
        self.update_dataset_version_with_warehouse_info(
            dst,
            dst_version,  # type: ignore[arg-type]
            schema=merged_schema,
        )
        for dep in src_dep:
            if dep and dep not in dst_dep:
                self.metastore.add_dependency(
                    dep,
                    dst.name,
                    dst_version,  # type: ignore[arg-type]
                )
    else:
        # case where we create new version of merged results
        src_dr = self.warehouse.dataset_rows(src, src_version)
        dst_dr = self.warehouse.dataset_rows(dst)

        merge_result_columns = list(
            {
                c.name: c for c in list(src_dr.table.c) + list(dst_dr.table.c)
            }.values()
        )

        dst_version = dst_version or dst.next_version
        dst = self.create_new_dataset_version(
            dst,
            dst_version,
            columns=merge_result_columns,
        )
        self.warehouse.merge_dataset_rows(
            src,
            dst,
            src_version,
            dst_version,
        )
        self.update_dataset_version_with_warehouse_info(dst, dst_version)
        for dep in set(src_dep + dst_dep):
            if dep:
                self.metastore.add_dependency(dep, dst.name, dst_version)

    return dst

query

query(
    query_script: str,
    envs: Optional[Mapping[str, str]] = None,
    python_executable: Optional[str] = None,
    save: bool = False,
    save_as: Optional[str] = None,
    preview_limit: int = 10,
    preview_offset: int = 0,
    preview_columns: Optional[list[str]] = None,
    capture_output: bool = True,
    output_hook: Callable[[str], None] = noop,
    params: Optional[dict[str, str]] = None,
    job_id: Optional[str] = None,
) -> QueryResult

Method to run custom user Python script to run a query and, as result, creates new dataset from the results of a query. Returns tuple of result dataset and script output.

Constraints on query script
  1. datachain.query.DatasetQuery should be used in order to create query for a dataset
  2. There should not be any .save() call on DatasetQuery since the idea is to create only one dataset as the outcome of the script
  3. Last statement must be an instance of DatasetQuery

If save is set to True, we are creating new dataset with results from dataset query. If it's set to False, we will just print results without saving anything

Example of query script

from datachain.query import DatasetQuery, C DatasetQuery('s3://ldb-public/remote/datasets/mnist-tiny/').filter( C.size > 1000 )

Source code in datachain/catalog/catalog.py
def query(
    self,
    query_script: str,
    envs: Optional[Mapping[str, str]] = None,
    python_executable: Optional[str] = None,
    save: bool = False,
    save_as: Optional[str] = None,
    preview_limit: int = 10,
    preview_offset: int = 0,
    preview_columns: Optional[list[str]] = None,
    capture_output: bool = True,
    output_hook: Callable[[str], None] = noop,
    params: Optional[dict[str, str]] = None,
    job_id: Optional[str] = None,
) -> QueryResult:
    """
    Method to run custom user Python script to run a query and, as result,
    creates new dataset from the results of a query.
    Returns tuple of result dataset and script output.

    Constraints on query script:
        1. datachain.query.DatasetQuery should be used in order to create query
        for a dataset
        2. There should not be any .save() call on DatasetQuery since the idea
        is to create only one dataset as the outcome of the script
        3. Last statement must be an instance of DatasetQuery

    If save is set to True, we are creating new dataset with results
    from dataset query. If it's set to False, we will just print results
    without saving anything

    Example of query script:
        from datachain.query import DatasetQuery, C
        DatasetQuery('s3://ldb-public/remote/datasets/mnist-tiny/').filter(
            C.size > 1000
        )
    """
    from datachain.query.dataset import ExecutionResult

    feature_file = tempfile.NamedTemporaryFile(
        dir=os.getcwd(), suffix=".py", delete=False
    )
    _, feature_module = os.path.split(feature_file.name)

    try:
        lines, proc, response_text = self.run_query(
            python_executable or sys.executable,
            query_script,
            envs,
            feature_file,
            capture_output,
            feature_module,
            output_hook,
            params,
            preview_columns,
            preview_limit,
            preview_offset,
            save,
            save_as,
            job_id,
        )
    finally:
        feature_file.close()
        os.unlink(feature_file.name)

    output = "".join(lines)

    if proc.returncode:
        if proc.returncode == QUERY_SCRIPT_CANCELED_EXIT_CODE:
            raise QueryScriptCancelError(
                "Query script was canceled by user",
                return_code=proc.returncode,
                output=output,
            )
        if proc.returncode == QUERY_SCRIPT_INVALID_LAST_STATEMENT_EXIT_CODE:
            raise QueryScriptRunError(
                "Last line in a script was not an instance of DatasetQuery",
                return_code=proc.returncode,
                output=output,
            )
        raise QueryScriptRunError(
            f"Query script exited with error code {proc.returncode}",
            return_code=proc.returncode,
            output=output,
        )

    try:
        response = json.loads(response_text)
    except ValueError:
        response = {}
    exec_result = ExecutionResult(**response)

    dataset: Optional[DatasetRecord] = None
    version: Optional[int] = None
    if save or save_as:
        dataset, version = self.save_result(
            query_script, exec_result, output, version, job_id
        )

    return QueryResult(
        dataset=dataset,
        version=version,
        output=output,
        preview=exec_result.preview,
        metrics=exec_result.metrics,
    )

register_dataset

register_dataset(
    dataset: DatasetRecord,
    version: int,
    target_dataset: DatasetRecord,
    target_version: Optional[int] = None,
) -> DatasetRecord

Registers dataset version of one dataset as dataset version of another one (it can be new version of existing one). It also removes original dataset version

Source code in datachain/catalog/catalog.py
def register_dataset(
    self,
    dataset: DatasetRecord,
    version: int,
    target_dataset: DatasetRecord,
    target_version: Optional[int] = None,
) -> DatasetRecord:
    """
    Registers dataset version of one dataset as dataset version of another
    one (it can be new version of existing one).
    It also removes original dataset version
    """
    target_version = target_version or target_dataset.next_version

    if not target_dataset.is_valid_next_version(target_version):
        raise DatasetInvalidVersionError(
            f"Version {target_version} must be higher than the current latest one"
        )

    dataset_version = dataset.get_version(version)
    if not dataset_version:
        raise ValueError(f"Dataset {dataset.name} does not have version {version}")

    if not dataset_version.is_final_status():
        raise ValueError("Cannot register dataset version in non final status")

    # copy dataset version
    target_dataset = self.metastore.create_dataset_version(
        target_dataset,
        target_version,
        sources=dataset_version.sources,
        status=dataset_version.status,
        query_script=dataset_version.query_script,
        error_message=dataset_version.error_message,
        error_stack=dataset_version.error_stack,
        script_output=dataset_version.script_output,
        created_at=dataset_version.created_at,
        finished_at=dataset_version.finished_at,
        schema=dataset_version.serialized_schema,
        num_objects=dataset_version.num_objects,
        size=dataset_version.size,
        preview=dataset_version.preview,
        job_id=dataset_version.job_id,
        is_job_result=dataset_version.is_job_result,
    )
    # to avoid re-creating rows table, we are just renaming it for a new version
    # of target dataset
    self.warehouse.rename_dataset_table(
        dataset.name,
        target_dataset.name,
        old_version=version,
        new_version=target_version,
    )
    self.metastore.update_dataset_dependency_source(
        dataset,
        version,
        new_source_dataset=target_dataset,
        new_source_dataset_version=target_version,
    )

    if dataset.id == target_dataset.id:
        # we are updating the same dataset so we need to refresh it to have newly
        # added version in step before
        dataset = self.get_dataset(dataset.name)

    self.remove_dataset_version(dataset, version, drop_rows=False)

    return self.get_dataset(target_dataset.name)

remove_dataset_version

remove_dataset_version(
    dataset: DatasetRecord,
    version: int,
    drop_rows: Optional[bool] = True,
) -> None

Deletes one single dataset version. If it was last version, it removes dataset completely

Source code in datachain/catalog/catalog.py
def remove_dataset_version(
    self, dataset: DatasetRecord, version: int, drop_rows: Optional[bool] = True
) -> None:
    """
    Deletes one single dataset version.
    If it was last version, it removes dataset completely
    """
    if not dataset.has_version(version):
        return
    dataset = self.metastore.remove_dataset_version(dataset, version)
    if drop_rows:
        self.warehouse.drop_dataset_rows_table(dataset, version)

storage_stats

storage_stats(uri: StorageURI) -> Optional[DatasetStats]

Returns tuple with storage stats: total number of rows and total dataset size.

Source code in datachain/catalog/catalog.py
def storage_stats(self, uri: StorageURI) -> Optional[DatasetStats]:
    """
    Returns tuple with storage stats: total number of rows and total dataset size.
    """
    partial_path = self.metastore.get_last_partial_path(uri)
    if partial_path is None:
        return None
    dataset = self.get_dataset(Storage.dataset_name(uri, partial_path))

    return self.dataset_stats(dataset.name, dataset.latest_version)

update_dataset

update_dataset(
    dataset: DatasetRecord, conn=None, **kwargs
) -> DatasetRecord

Updates dataset fields.

Source code in datachain/catalog/catalog.py
def update_dataset(
    self, dataset: DatasetRecord, conn=None, **kwargs
) -> DatasetRecord:
    """Updates dataset fields."""
    old_name = None
    new_name = None
    if "name" in kwargs and kwargs["name"] != dataset.name:
        old_name = dataset.name
        new_name = kwargs["name"]

    dataset = self.metastore.update_dataset(dataset, conn=conn, **kwargs)

    if old_name and new_name:
        # updating name must result in updating dataset table names as well
        for version in [v.version for v in dataset.versions]:
            self.warehouse.rename_dataset_table(
                old_name,
                new_name,
                old_version=version,
                new_version=version,
            )

    return dataset