Proposed Pull Request Change

title description author ms.author ms.reviewer ms.subservice ms.devlang ms.topic ms.date ms.custom
Quickstart: Create an Azure Data Factory using Python Use a data factory to copy data from one location in Azure Blob storage to another location. whhender whhender susabat data-movement python quickstart 02/13/2025 ['devx-track-python', 'mode-api', 'sfi-ropc-nochange']
📄 Document Links
GitHub View on GitHub Microsoft Learn View on Microsoft Learn
Raw New Markdown
Generating updated version of doc...
Rendered New Markdown
Generating updated version of doc...
+0 -0
+0 -0
--- title: 'Quickstart: Create an Azure Data Factory using Python' description: Use a data factory to copy data from one location in Azure Blob storage to another location. author: whhender ms.author: whhender ms.reviewer: susabat ms.subservice: data-movement ms.devlang: python ms.topic: quickstart ms.date: 02/13/2025 ms.custom: - devx-track-python - mode-api - sfi-ropc-nochange --- # Quickstart: Create a data factory and pipeline using Python [!INCLUDE[appliesto-adf-xxx-md](includes/appliesto-adf-xxx-md.md)] In this quickstart, you create a data factory by using Python. The pipeline in this data factory copies data from one folder to another folder in Azure Blob storage. Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows for orchestrating and automating data movement and data transformation. Using Azure Data Factory, you can create and schedule data-driven workflows, called pipelines. Pipelines can ingest data from disparate data stores. Pipelines process or transform data by using compute services such as Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, and Azure Machine Learning. Pipelines publish output data to data stores such as Azure Synapse Analytics for business intelligence (BI) applications. ## Prerequisites * An Azure account with an active subscription. [Create one for free](https://azure.microsoft.com/pricing/purchase-options/azure-account?cid=msft_learn). * [Python 3.6+](https://www.python.org/downloads/). * [An Azure Storage account](../storage/common/storage-account-create.md). * [Azure Storage Explorer](https://storageexplorer.com/) (optional). * [An application in Microsoft Entra ID](../active-directory/develop/howto-create-service-principal-portal.md#register-an-application-with-azure-ad-and-create-a-service-principal). Create the application by following the steps in this link, using Authentication Option 2 (application secret), and assign the application to the **Contributor** role by following instructions in the same article. Make note of the following values as shown in the article to use in later steps: **Application (client) ID, client secret value, and tenant ID.** ## Create and upload an input file 1. Launch Notepad. Copy the following text and save it as **input.txt** file on your disk. ```text John|Doe Jane|Doe ``` 2. Use tools such as [Azure Storage Explorer](https://storageexplorer.com/) to create the **adfv2tutorial** container, and **input** folder in the container. Then, upload the **input.txt** file to the **input** folder. ## Install the Python package 1. Open a terminal or command prompt with administrator privileges.  2. First, install the Python package for Azure management resources: ```python pip install azure-mgmt-resource ``` 3. To install the Python package for Data Factory, run the following command: ```python pip install azure-mgmt-datafactory ``` The [Python SDK for Data Factory](https://github.com/Azure/azure-sdk-for-python) supports Python 2.7 and 3.6+. 4. To install the Python package for Azure Identity authentication, run the following command: ```python pip install azure-identity ``` > [!NOTE] > The "azure-identity" package might have conflicts with "azure-cli" on some common dependencies. If you meet any authentication issue, remove "azure-cli" and its dependencies, or use a clean machine without installing "azure-cli" package to make it work. > For Sovereign clouds, you must use the appropriate cloud-specific constants. Please refer to [Connect to all regions using Azure libraries for Python Multi-cloud | Microsoft Docs for instructions to connect with Python in Sovereign clouds.](/azure/developer/python/sdk/azure-sdk-sovereign-domain) ## Create a data factory client 1. Create a file named **datafactory.py**. Add the following statements to add references to namespaces. ```python from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time ``` 2. Add the following functions that print information. ```python def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message'])) ``` 3. Add the following code to the **Main** method that creates an instance of DataFactoryManagementClient class. You use this object to create the data factory, linked service, datasets, and pipeline. You also use this object to monitor the pipeline run details. Set **subscription_id** variable to the ID of your Azure subscription. For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand **Analytics** to locate **Data Factory**: [Products available by region](https://azure.microsoft.com/global-infrastructure/services/). The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions. ```python def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Sovereign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'} ``` ## Create a data factory Add the following code to the **Main** method that creates a **data factory**. If your resource group already exists, comment out the first `create_or_update` statement. ```python # create the resource group # comment out if the resource group already exits resource_client.resource_groups.create_or_update(rg_name, rg_params) #Create a data factory df_resource = Factory(location='westus') df = adf_client.factories.create_or_update(rg_name, df_name, df_resource) print_item(df) while df.provisioning_state != 'Succeeded': df = adf_client.factories.get(rg_name, df_name) time.sleep(1) ``` ## Create a linked service Add the following code to the **Main** method that creates an **Azure Storage linked service**. You create linked services in a data factory to link your data stores and compute services to the data factory. In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample. Replace `<storageaccountname>` and `<storageaccountkey>` with name and key of your Azure Storage account. ```python # Create an Azure Storage linked service ls_name = 'storageLinkedService001' # IMPORTANT: specify the name and key of your Azure Storage account. storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>') ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage) print_item(ls) ``` ## Create datasets In this section, you create two datasets: one for the source and the other for the sink. ### Create a dataset for source Azure Blob Add the following code to the Main method that creates an Azure blob dataset. For information about properties of Azure Blob dataset, see [Azure blob connector](connector-azure-blob-storage.md#dataset-properties) article. You define a dataset that represents the source data in Azure Blob. This Blob dataset refers to the Azure Storage linked service you create in the previous step. ```python # Create an Azure blob dataset (input) ds_name = 'ds_in' ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name) blob_path = '<container>/<folder path>' blob_filename = '<file name>' ds_azure_blob = DatasetResource(properties=AzureBlobDataset( linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) ds = adf_client.datasets.create_or_update( rg_name, df_name, ds_name, ds_azure_blob) print_item(ds) ``` ### Create a dataset for sink Azure Blob Add the following code to the Main method that creates an Azure blob dataset. For information about properties of Azure Blob dataset, see [Azure blob connector](connector-azure-blob-storage.md#dataset-properties) article. You define a dataset that represents the source data in Azure Blob. This Blob dataset refers to the Azure Storage linked service you create in the previous step. ```python # Create an Azure blob dataset (output) dsOut_name = 'ds_out' output_blobpath = '<container>/<folder path>' dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)) dsOut = adf_client.datasets.create_or_update( rg_name, df_name, dsOut_name, dsOut_azure_blob) print_item(dsOut) ``` ## Create a pipeline Add the following code to the **Main** method that creates a **pipeline with a copy activity**. ```python # Create a copy activity act_name = 'copyBlobtoBlob' blob_source = BlobSource() blob_sink = BlobSink() dsin_ref = DatasetReference(reference_name=ds_name) dsOut_ref = DatasetReference(reference_name=dsOut_name) copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink) #Create a pipeline with the copy activity #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline. #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername. p_name = 'copyPipeline' params_for_pipeline = {} p_name = 'copyPipeline' params_for_pipeline = {} p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline) p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj) print_item(p) ``` ## Create a pipeline run Add the following code to the **Main** method that **triggers a pipeline run**. ```python # Create a pipeline run run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={}) ``` ## Monitor a pipeline run To monitor the pipeline run, add the following code the **Main** method: ```python # Monitor the pipeline run time.sleep(30) pipeline_run = adf_client.pipeline_runs.get( rg_name, df_name, run_response.run_id) print("\n\tPipeline run status: {}".format(pipeline_run.status)) filter_params = RunFilterParameters( last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1)) query_response = adf_client.activity_runs.query_by_pipeline_run( rg_name, df_name, pipeline_run.run_id, filter_params) print_activity_run_details(query_response.value[0]) ``` Now, add the following statement to invoke the **main** method when the program is run: ```python # Start the main method main() ``` ## Full script Here is the full Python code: ```python from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message'])) def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'} # create the resource group # comment out if the resource group already exits resource_client.resource_groups.create_or_update(rg_name, rg_params) # Create a data factory df_resource = Factory(location='westus') df = adf_client.factories.create_or_update(rg_name, df_name, df_resource) print_item(df) while df.provisioning_state != 'Succeeded': df = adf_client.factories.get(rg_name, df_name) time.sleep(1) # Create an Azure Storage linked service ls_name = 'storageLinkedService001' # IMPORTANT: specify the name and key of your Azure Storage account. storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>') ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage) print_item(ls) # Create an Azure blob dataset (input) ds_name = 'ds_in' ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name) blob_path = '<container>/<folder path>' blob_filename = '<file name>' ds_azure_blob = DatasetResource(properties=AzureBlobDataset( linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) ds = adf_client.datasets.create_or_update( rg_name, df_name, ds_name, ds_azure_blob) print_item(ds) # Create an Azure blob dataset (output) dsOut_name = 'ds_out' output_blobpath = '<container>/<folder path>' dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)) dsOut = adf_client.datasets.create_or_update( rg_name, df_name, dsOut_name, dsOut_azure_blob) print_item(dsOut) # Create a copy activity act_name = 'copyBlobtoBlob' blob_source = BlobSource() blob_sink = BlobSink() dsin_ref = DatasetReference(reference_name=ds_name) dsOut_ref = DatasetReference(reference_name=dsOut_name) copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[ dsOut_ref], source=blob_source, sink=blob_sink) # Create a pipeline with the copy activity p_name = 'copyPipeline' params_for_pipeline = {} p_obj = PipelineResource( activities=[copy_activity], parameters=params_for_pipeline) p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj) print_item(p) # Create a pipeline run run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={}) # Monitor the pipeline run time.sleep(30) pipeline_run = adf_client.pipeline_runs.get( rg_name, df_name, run_response.run_id) print("\n\tPipeline run status: {}".format(pipeline_run.status)) filter_params = RunFilterParameters( last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1)) query_response = adf_client.activity_runs.query_by_pipeline_run( rg_name, df_name, pipeline_run.run_id, filter_params) print_activity_run_details(query_response.value[0]) # Start the main method main() ``` ## Run the code Build and start the application, then verify the pipeline execution. The console prints the progress of creating data factory, linked service, datasets, pipeline, and pipeline run. Wait until you see the copy activity run details with data read/written size. Then, use tools such as [Azure Storage explorer](https://azure.microsoft.com/features/storage-explorer/) to check the blob(s) is copied to "outputBlobPath" from "inputBlobPath" as you specified in variables. Here is the sample output: ```console Name: <data factory name> Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name> Location: eastus Tags: {} Name: storageLinkedService Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService Name: ds_in Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in Name: ds_out Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out Name: copyPipeline Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline Pipeline run status: Succeeded Datetime with no tzinfo will be considered UTC. Datetime with no tzinfo will be considered UTC. Activity run details Activity run status: Succeeded Number of bytes read: 18 Number of bytes written: 18 Copy duration: 4 ``` ## Clean up resources To delete the data factory, add the following code to the program: ```python adf_client.factories.delete(rg_name, df_name) ``` ## Related content The pipeline in this sample copies data from one location to another location in an Azure blob storage. Go through the [tutorials](tutorial-copy-data-dot-net.md) to learn about using Data Factory in more scenarios.
Success! Branch created successfully. Create Pull Request on GitHub
Error: