Querying Power BI REST API using Fabric Spark SQL

Microsoft Fabric has a lot of different components which usually work very well together. However, even though Power BI is a fundamental part of Fabric, there is not really a tight integration between Data Engineering components and Power BI. In this blog post I will show you an easy and reusable way to query the Power BI REST API via Fabric SQL in a very straight forward way. The extracted data can then be stored in the data lake e.g. to create a history of your dataset refreshes, the state of your workspaces or any other information that is provided by the REST API.

To achieve this, we need to prepare a couple of things first:

  • get an access token to work with the Power BI REST API
  • expose the access token as a SQL variable
  • create a PySpark function to query the Power BI REST API
  • expose the PySpark function as a SQL user-defined function
  • use SQL to query the Power BI REST API

To get an access token for the Power BI REST API we can use mssparkutils.credentials.getToken and provide the OAuth audience for the Power BI REST API which would be https://analysis.windows.net/powerbi/api

pbi_access_token = mssparkutils.credentials.getToken("https://analysis.windows.net/powerbi/api")

We then need to make this token available in Fabric Spark SQL by storing it in a variable:

spark.sql(f"SET pbi_access_token={pbi_access_token}")

The next part is probably the most complex one. We need to write a Python function that runs a query against the Power BI REST API and returns the results in a standardized way. I will not go into too much detail but simply show the code. It basically queries the REST API via a GET request, checks if the result contains a property value with the results and then returns them as a list of items. Please check e.g. the GET Groups REST API call to better understand the structure of the result. The function further adds a new property to each item to make nesting of API calls easier as you will see in the final example.

import requests

# make sure to support different versions of the API path passed to the function
def get_api_path(path: str) -> str:
    base_path = "https://api.powerbi.com/v1.0/myorg/"
    base_items = list(filter(lambda x: x, base_path.split("/")))
    path_items = list(filter(lambda x: x, path.split("/")))

    index = path_items.index(base_items[-1]) if base_items[-1] in path_items else -1

    return base_path + "/".join(path_items[index+1:])

# call the api_path with the given token and return the list in the "value" property
def pbi_api(api_path: str, token: str) -> object:
    
    result = requests.get(get_api_path(api_path), headers = {"authorization": "Bearer " + token})

    if not result.ok:
        return [{"status_code": result.status_code, "error": result.reason}]

    json = result.json()

    if not "value" in json:
        return []

    values = json["value"]

    for value in values:
        if "id" in value:
            value["apiPath"] = f"{api_path}/{value['id']}"
        else:
            value["apiPath"] = f"{api_path}"

    return values

Once we have our Python function, we can make it accessible to Spark. In order to do this, we need to define a Spark data type that is returned by our function. To make it work with all different kinds of API calls without knowing all potential properties that might get returned, we use a map type with string keys and string values to cover all variations in the different APIs. As the result is always a list of items, we wrap our map type into an array type.
The following code exposes it to PySpark and also Spark SQL.

import pyspark.sql.functions as F
import pyspark.sql.types as T

# schema of the function output - an array of maps to make it work with all API outputs
schema = T.ArrayType(
    T.MapType(T.StringType(), T.StringType())
)

# register the function for PySpark
pbi_api_udf = F.udf(lambda api_path, token: pbi_api(api_path, token), schema)

# register the function for SparkSQL
spark.udf.register("pbi_api_udf", pbi_api_udf)

Now we are finally ready to query the Power BI REST API via Spark SQL. We need to use the magic %%sql to tell the notebook engine, we are running SQL code in this one cell. We then run our function in a simple SELECT statement and provide the API endpoint we want to query and a reference to our token-variable using the variable syntax ${variable-name}.

%%sql 
SELECT pbi_api_udf('/groups', '${pbi_access_token}') as workspaces

This will return a table with a single row and a single cell:

However, that cell contains an array which can be exploded to get our actual list of workspaces and their details:

%%sql
SELECT explode(pbi_api_udf('/groups', '${pbi_access_token}')) as workspace

Once you understood those concepts, it is pretty easy to query the Power BI REST API via SQL as this can also be combined with other Spark SQL capabilities like CTEs, e.g. to get a list of all datasets across all workspaces as shown below:

%%sql
WITH cte_workspaces AS (
    SELECT explode(pbi_api_udf('/groups', '${pbi_access_token}')) as workspace
)
SELECT workspace.name, workspace.id, pbi_api_udf(concat(workspace.apiPath, '/datasets'), '${pbi_access_token}') as datasets
FROM cte_workspaces

As you can see, to show a given property as a separate column, you can just use the dot-notation to reference it – e.g. workspace.name or workspace.id

There are endless possibilities using this solution, from easy interactive querying to historically persisting the state of your Power BI objects in your data lake!

Obviously, there are still some things that could be improved. It would be much more elegant to have a Table Valued Function instead of the scalar function that returns an array which needs to be exploded afterwards. However, this is not yet possible in Fabric but will hopefully come soon.

This technique can also be applied to any other APIs that expose data. The most challenging part is usually the authentication but Fabric’s mssparkutils.credentials make it pretty easy for us to do this.

27 Replies to “Querying Power BI REST API using Fabric Spark SQL”

  1. Pingback: Querying the Power BI REST API from Fabric Spark – Curated SQL

  2. Hi Gerhard, thanks for your post. Where do I put spark.sql(f”SET token={pbi_access_token}”) ? When I use it in a notebook with PySpark Python or SQL I get error messages.
    Regards
    Tom

    • thats Python/PySpark code
      in my example I used a Python/PySpark notebook
      whenever I used SQL, I used the %%sql notebook magic

      • I’m running into the same issue. When I run the 2nd line of code spark.sql(f”SET token={pbi_access_token}”), I get an error.

        NameError: name ‘pbi_access_token’ is not defined

        • sorry, seems like there was some copy&paste error on my side where I mixed up some variable names.
          I fixed the code and it should work again now

          • That did the trick! Thank you so much Gerhard. As a long-time Power BI user and no-time pyspark user, this is amazing.

  3. As a data science student I’m not sure how I can use this?
    to work with SQL using power BI data or vice versa?
    Any tips is appreciated. Thanks for sharing.

    • The idea is to simply query Power BI metadata which is only exposed via the REST API using simple SQL statements instead of writing the API calls on your own
      there are some use-cases for monitoring and operations I would say and for quick exploration of your PBI tenant

  4. Do you have some examples of how to use the REST API to get Datasets outside of your Org and pull them in to be used as dataflows?

    • so do you want to query the Power BI metadata from a different org or e.g. send a DAX query to a dataset that resides in a different org?

      either way, the most challenging part will probably be authentication. I dont know if this is supported out of the box.
      What you can try though is replacing /myorg/ in your API calls with //
      You will still need to get an access token though and to be honest I am not quite sure how to do that for remote tenants

      • We pull in data from other organisations, normally read account on the DB is fine but API’s are being pushed and I am really struggling with the REST API and how I can use it for those DBs that will not give me DB access and want me to use API.

        Thanks for the ideas, i will give it a play

        • guess this very much differs from what this blog post is about
          if I understood correctly, you want to connect from Power BI to those new REST APIs provided by the other organisations. Then you need to stick to the authentication their REST APIs provide

  5. Looking at the above I am thinking it must also be possible to get the activity events in this way, but I do not really understand what I need to change to the code to access these, can you maybe explain? Would be much appreciated!

  6. I am so sorry, above comment lost identation and also lost a mention to insert Darren Gosbell’s code to be able to call admin APIs, removing only the “pip install” line because it resets the session. Darren Gosbell’s code would be inserted above that cell:
    spark.sql(f”SET pbi_admin_token={access_token}”)

    • can you update your other comment to also include Darren Gosbell’s code so its easier for others to use it
      I think its not quit clear which part of his code exactly you are referring to

  7. Great! Thank you very much!

    If anyone is interested, please see code below to call activityevents with pagination. Each — represents one notebook cell. Replace [tab] with identation.

    %pip install azure.identity

    pbi_access_token = mssparkutils.credentials.getToken(“https://analysis.windows.net/powerbi/api”)
    spark.sql(f”SET pbi_access_token={pbi_access_token}”)

    #function pbi_api modified to handle other arrays different than “value” and pagination
    import requests
    # make sure to support different versions of the API path passed to the function
    def get_api_path(path: str) -> str:
    [tab]base_path = “https://api.powerbi.com/v1.0/myorg/”
    [tab]base_items = list(filter(lambda x: x, base_path.split(“/”)))
    [tab]path_items = list(filter(lambda x: x, path.split(“/”)))
    [tab]index = path_items.index(base_items[-1]) if base_items[-1] in path_items else -1
    [tab]return base_path + “/”.join(path_items[index+1:])
    # call the api_path with the given token and return the list in the “value” property
    def pbi_api(api_path: str, token: str, jsonitem: str = “value”) -> object:
    [tab]continuationUri = “”
    [tab]continuationToken = “”
    [tab]continua = True
    [tab]countloop = 1
    [tab]currentpath = get_api_path(api_path)
    [tab]values = []

    [tab]while(continua):
    [tab][tab]#print(f”countloop={countloop}————————————————————–“)

    [tab][tab]result = requests.get(currentpath, headers = {“authorization”: “Bearer ” + token})

    [tab][tab]if not result.ok:
    [tab][tab][tab]return [{“status_code”: result.status_code, “error”: result.reason}]
    [tab][tab]json = result.json()
    [tab][tab]#if not jsonitem in json:
    [tab][tab]# return []
    [tab][tab]valuestemp = json[jsonitem]
    [tab][tab]for value in valuestemp:
    [tab][tab][tab]if “id” in value:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}/{value[‘id’]}”
    [tab][tab][tab]else:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}”
    [tab][tab]values += valuestemp

    [tab][tab]if “continuationUri” in json and json[“continuationUri”] is not None:
    [tab][tab][tab]continuationUri = json[“continuationUri”]
    [tab][tab][tab]#print(f”VAR continuationUri={continuationUri}”)
    [tab][tab][tab]currentpath = continuationUri
    [tab][tab]else:
    [tab][tab][tab]continua = False

    [tab][tab]if “continuationToken” in json and json[“continuationToken”] is not None:
    [tab][tab][tab]continuationToken = json[“continuationToken”]
    [tab][tab][tab]#print(f”VAR continuationToken={continuationToken}”)
    [tab][tab]countloop = countloop + 1
    [tab][tab]if countloop > 1000:
    [tab][tab][tab]return [{“status_code”: -1, “error”: “Something is wrong!”}]
    [tab]return values

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf = F.udf(lambda api_path, token: pbi_api(api_path, token), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf”, pbi_api_udf)

    #register another function modified to handle other arrays different than “value”
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf2 = F.udf(lambda api_path, token, jsonitem: pbi_api(api_path, token, jsonitem), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf2”, pbi_api_udf2)

    %%sql
    SELECT explode(pbi_api_udf(‘/groups’, ‘${pbi_access_token}’)) as workspace

    Insert here Darren Gosbell’s code to be able to call admin APIs, removing only the “pip install” line because it resets the session

    spark.sql(f”SET pbi_admin_token={access_token}”)

    %%sql
    SELECT explode(pbi_api_udf(‘admin/capacities/refreshables?$top=200’, ‘${pbi_admin_token}’)) AS refreshables

    %%sql
    –Note: it supports only 1 day at a time
    SELECT explode(pbi_api_udf2(“/admin/activityevents?startDateTime=’2023-10-18T00:00:00’&endDateTime=’2023-10-18T23:59:59′”, ‘${pbi_admin_token}’, ‘activityEventEntities’)) AS atividades

  8. Great! Thank you very much!

    If anyone is interested, please see code below to call activityevents API with pagination.
    Gerhard, could you please delete my previous comments? I am sorry for the various comments, I had to adjust some code due to the blog comment engine automatic formatting.

    Please note in the code below:
    1) Each “—” below represents one notebook cell, so split the code in multiple cells.
    2) Replace “[tab]” below with proper identation, because blog comment engine removes all identation.


    %pip install azure.identity

    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    import json, requests, pandas as pd
    import datetime
    from azure.identity import ClientSecretCredential
    import requests

    # if you need to call admin APIs, then you have to obtain admin token:
    # Darren Gosbell’s code to be able to call admin APIs:
    #########################################################################################
    # Read secretes from Azure Key Vault
    #########################################################################################
    key_vault = “https://dgosbellKeyVault.vault.azure.net/”
    tenant = mssparkutils.credentials.getSecret(key_vault , “FabricTenantId”)
    client = mssparkutils.credentials.getSecret(key_vault , “FabricClientId”)
    client_secret = mssparkutils.credentials.getSecret(key_vault , “FabricClientSecret”)
    # if you dont have secrets in Azure Key Vault, which is not recommended by the way, simply fill the variables below:
    #tenant = “???”
    #client = “???”
    #client_secret = “???”
    #########################################################################################
    # Authentication (for admin APIs)
    #########################################################################################
    # Generates the access token for the Service Principal
    api = ‘https://analysis.windows.net/powerbi/api/.default’
    auth = ClientSecretCredential(authority = ‘https://login.microsoftonline.com/’, tenant_id = tenant, client_id = client, client_secret = client_secret)
    access_token = auth.get_token(api)
    access_token = access_token.token
    print(‘\nSuccessfully authenticated.’)
    #########################################################################################
    # If you would like to test, then you can call Get Refreshables
    #########################################################################################
    #base_url = ‘https://api.powerbi.com/v1.0/myorg/’
    #header = {‘Authorization’: f’Bearer {access_token}’}
    #refreshables_url = “admin/capacities/refreshables?$top=200”
    #refreshables_response = requests.get(base_url + refreshables_url, headers=header)
    #print(refreshables_response.content)

    #if you dont need to call admin APIs, then you can obtain access token instead of admin token:
    #access_token = mssparkutils.credentials.getToken(“https://analysis.windows.net/powerbi/api”)

    spark.sql(f”SET pbi_access_token={access_token}”)

    #function pbi_api modified from original article to handle other json items different than “value” and also to handle pagination.
    # make sure to support different versions of the API path passed to the function
    def get_api_path(path: str) -> str:
    [tab]base_path = “https://api.powerbi.com/v1.0/myorg/”
    [tab]base_items = list(filter(lambda x: x, base_path.split(“/”)))
    [tab]path_items = list(filter(lambda x: x, path.split(“/”)))
    [tab]index = path_items.index(base_items[-1]) if base_items[-1] in path_items else -1
    [tab]return base_path + “/”.join(path_items[index+1:])
    # call the api_path with the given token and return the list in the “value” property
    def pbi_api(api_path: str, token: str, jsonitem: str = “value”) -> object:
    [tab]continuationUri = “”
    [tab]continuationToken = “”
    [tab]continua = True
    [tab]countloop = 1
    [tab]currentpath = get_api_path(api_path)
    [tab]values = []

    [tab]while(continua):
    [tab][tab]#print(f”countloop={countloop}————————————————————–“)

    [tab][tab]result = requests.get(currentpath, headers = {“authorization”: “Bearer ” + token})

    [tab][tab]if not result.ok:
    [tab][tab][tab]return [{“status_code”: result.status_code, “error”: result.reason}]
    [tab][tab]json = result.json()
    [tab][tab]#if not jsonitem in json:
    [tab][tab]# return []
    [tab][tab]valuestemp = json[jsonitem]
    [tab][tab]for value in valuestemp:
    [tab][tab][tab]if “id” in value:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}/{value[‘id’]}”
    [tab][tab][tab]else:
    [tab][tab][tab][tab]value[“apiPath”] = f”{api_path}”
    [tab][tab]values += valuestemp

    [tab][tab]if “continuationUri” in json and json[“continuationUri”] is not None:
    [tab][tab][tab]continuationUri = json[“continuationUri”]
    [tab][tab][tab]#print(f”VAR continuationUri={continuationUri}”)
    [tab][tab][tab]currentpath = continuationUri
    [tab][tab]else:
    [tab][tab][tab]continua = False

    [tab][tab]if “continuationToken” in json and json[“continuationToken”] is not None:
    [tab][tab][tab]continuationToken = json[“continuationToken”]
    [tab][tab][tab]#print(f”VAR continuationToken={continuationToken}”)
    [tab][tab]countloop = countloop + 1
    [tab][tab]if countloop > 1000:
    [tab][tab][tab]return [{“status_code”: -1, “error”: “Something is wrong!”}]
    [tab]return values

    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf = F.udf(lambda api_path, token: pbi_api(api_path, token), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf”, pbi_api_udf)

    #register another function modified to handle other arrays different than “value”
    # schema of the function output – an array of maps to make it work with all API outputs
    schema = T.ArrayType(
    [tab]T.MapType(T.StringType(), T.StringType())
    )
    # register the function for PySpark
    pbi_api_udf2 = F.udf(lambda api_path, token, jsonitem: pbi_api(api_path, token, jsonitem), schema)
    # register the function for SparkSQL
    spark.udf.register(“pbi_api_udf2”, pbi_api_udf2)

    %%sql
    –Test udf with non-admin API:
    SELECT explode(pbi_api_udf(‘/groups’, ‘${pbi_access_token}’)) as workspace

    %%sql
    –Test udf with admin API:
    SELECT explode(pbi_api_udf(‘admin/capacities/refreshables?$top=200’, ‘${pbi_access_token}’)) AS refreshables

    %%sql
    –Test udf with admin API with pagination and also returning activityEventEntities json items:
    –Note: This API supports only 1 day at a time
    SELECT explode(pbi_api_udf2(“/admin/activityevents?startDateTime=’2023-10-18T00:00:00’&endDateTime=’2023-10-18T23:59:59′”, ‘${pbi_access_token}’, ‘activityEventEntities’)) AS atividades

  9. Interesting to see all the different approaches to working with the Power BI REST API in notebooks.

    My attempt at calling the admin APIs (specifically the Activity Events API and dumping the results into a Delta table is here: https://github.com/klinejordan/fabric-tenant-admin-notebooks/blob/main/Fabric%20Admin%20Activities.ipynb

    Have not had much like with Spark UDFs calling more complex APIs like the Scanner API as they don’t seem to be able to handle polling very well but that could just be my inexperience with Spark

  10. Pingback: how to call power bi rest apis from fabric data pipelines.

  11. Hello! Thank you for the article!

    Would u know if it is possible to read data from the semantic model in power bi SERVICE directly to databricks where I can perform some data query? Is it possible to do this with SELF API or do you know of any other ways to perform this task? Thank you!

Leave a Reply

Your email address will not be published. Required fields are marked *

*