SahamatiNet
  • Background
  • SahamatiNet POC
    • Introduction
    • Applications
    • Observability
    • Integration Steps
      • Sandbox Onboarding
      • IAM APIs
      • CR APIs
      • Integration with Router
        • Sample Code Snippets
          • Python
          • Java
          • JavaScript
          • GoLang
          • C#
        • Router APIs Specifications
          • FIU API Specification
          • AA API Specification
          • FIP API Specification
      • ReBIT Workflows using Router
        • Account Discovery & Linking
        • Consent Workflow
        • FI Request Workflow
    • Integration with Simulators
      • AA Simulator
      • FIP Simulator
      • FIU Simulator
    • Validation of Integration
  • Glossary
  • Guidelines
  • Frequently Asked Questions
  • How To Guides
    • How To Onboard to Sandbox ?
    • How To Decide on an Entity ID ?
    • How To Generate a Certificate ?
    • How To Generate Tokens ?
Powered by GitBook
LogoLogo

Copyright © 2025 - Sahamati Foundation

On this page

Was this helpful?

Export as PDF
  1. SahamatiNet POC
  2. Integration Steps
  3. Integration with Router
  4. Sample Code Snippets

Python


Without Router - Current Approach
# Assuming this as previous logic without Router
import requests

entity_metadata_from_cr = {
    "baseUrl": "http://fip-1.dev.sahamati.org.in/fip-simulate",
    "id": "FIP-SIMULATOR"
}

def get_http_config(base_url, route, headers, data, method_type):
    return {
        "url": f"{base_url}{route}",
        "headers": headers,
        "data": data,
        "method": method_type
    }

def execute_discovery_request():
    route = "/v2/Accounts/discover"
    
    config = get_http_config(
        base_url=entity_metadata_from_cr["baseUrl"],
        route=route,
        headers={},
        data={},
        method_type="POST"
    )
    
    try:
        response = requests.request(**config)
        return response.json()
    except Exception as error:
        print(f"Error making discovery request: {str(error)}")
        raise
def perform_another_operations(account_discover_response):
    print("Perform another operations with the response", account_discover_response)

if __name__ == "__main__":
    try:
        response = execute_discovery_request()
        perform_another_operations(response)
        print("Account discovery successful.")
    except Exception:
        print("Account discovery failed.")
    finally:
        print("Account discovery completed.")

With Router Integration
# Changes to implementation to integrate with Router

import requests
import json
import base64

# Start - New Changes 

def generate_base64_encoded_json(json_data):
    json_str = json.dumps(json_data)
    return base64.b64encode(json_str.encode()).decode()

def generate_request_meta(recipient_id):
    context = {"recipient-id": recipient_id}
    return generate_base64_encoded_json(context)

def add_sahamati_configuration(config, route):
    router_url = "http://api.dev.sahamati.org.in/router"
    config["url"] = f"{router_url}{route}"
    if "headers" not in config:
        config["headers"] = {}
    config["headers"]["x-request-meta"] = generate_request_meta(entity_metadata_from_cr["id"])
    return config

# END - New Changes

# OLD Changes
entity_metadata_from_cr = {
    "baseUrl": "http://fip-1.dev.sahamati.org.in/fip-simulate",
    "id": "FIP-SIMULATOR"
}

def get_http_config(base_url, route, headers, data, method_type):
    return {
        "url": f"{base_url}{route}",
        "headers": headers,
        "data": data,
        "method": method_type
    }

def execute_discovery_request():
    route = "/v2/Accounts/discover"
    
    config = get_http_config(
        base_url=entity_metadata_from_cr["baseUrl"],
        route=route,
        headers={},
        data={},
        method_type="POST"
    )

    # add Sahamati configuration [Start]
    config = add_sahamati_configuration(config, route)
    # add Sahamati configuration [End]

    try:
        response = requests.request(**config)
        return response.json()
    except Exception as error:
        print(f"Error making discovery request: {str(error)}")
        raise

def perform_another_operations(account_discover_response):
    print("Perform another operations with the response", account_discover_response)

try:
    response = execute_discovery_request()
    perform_another_operations(response)
    print("Account discovery successful.")
except Exception as e:
    print(e)
    print("Account discovery failed.")
finally:
    print("Account discovery completed.")

PreviousSample Code SnippetsNextJava

Last updated 26 days ago

Was this helpful?