This code snippet performs the following steps: 1. **Read recipe inputs**:...

September 3, 2025 at 07:18 AM

# Read recipe inputs df = dkuspark.get_dataframe(sqlContext, dataiku.Dataset("rds_pd_fi_insurers")) # Compute recipe outputs from inputs renaming_dict = {'agg_parent_company_regulatory_code': 'agg_parent_company_ctr_code', 'rd_ins_life_investedassetsqlty' : 'rd_ins_life_investassetsqlty', 'rd_ins_nonlife_investedassetsqlty' : 'rd_ins_nonlife_investassetsqlty', 'rd_ins_investedassetsqlty' : 'rd_ins_investassetsqlty', 'insurers_subcluster_award_criteria' : 'insurers_subclusterawardcriteria' } for old_name in renaming_dict: df = df.withColumnRenamed(old_name, renaming_dict[old_name]) # Write recipe outputs dkuspark.write_with_schema(dataiku.Dataset("shared_rds_pd_fi_insurers"), df)

This code snippet performs the following steps:

  1. Read recipe inputs: It starts by loading a dataset named "rds_pd_fi_insurers" from Dataiku into a PySpark DataFrame (df) using the dkuspark.get_dataframe function. It takes the sqlContext object and the Dataiku dataset as arguments.

  2. Compute recipe outputs from inputs:

    • It defines a dictionary (renaming_dict) that maps specific column names in the dataset to their new names.
    • Using a for loop, it iterates through the keys (representing old column names) in the dictionary and uses PySpark's withColumnRenamed method to rename each column in the DataFrame according to the mapping. Specifically:
      • agg_parent_company_regulatory_code is renamed to agg_parent_company_ctr_code.
      • rd_ins_life_investedassetsqlty is renamed to rd_ins_life_investassetsqlty.
      • rd_ins_nonlife_investedassetsqlty is renamed to rd_ins_nonlife_investassetsqlty.
      • rd_ins_investedassetsqlty is renamed to rd_ins_investassetsqlty.
      • insurers_subcluster_award_criteria is renamed to insurers_subclusterawardcriteria.
  3. Write recipe outputs: It writes the resulting DataFrame (df) back to a Dataiku dataset named "shared_rds_pd_fi_insurers" using the dkuspark.write_with_schema function, preserving the schema compatibility.

Summary

The code reads a Dataiku dataset into a PySpark DataFrame, renames specific columns using a predefined mapping, and then saves the updated DataFrame back into a new or existing Dataiku dataset. It's part of a Dataiku workflow for transforming and processing datasets.

Generate your own explanations
Download our vscode extension
Read other generated explanations

Built by @thebuilderjr
Sponsored by beam analytics
Read our terms and privacy policy
Forked from openai-quickstart-node