How To Take the MSSQL Table level Backup and Restore using python

MSSQL, Python, Backup, Restore, AWS, Data

MSSQL Table level Backup

Overview of the Process #

  1. EC2 Linux box will have python script which will talk to AWS secret Manager to collect the MSSQL username and password.
  2. The Python script will be run again the MSSQL EC2 windows box and take the table level backup and also restore.

Introduction #

This blog post explain you about taking the backup from MSSQL server and restore data on table level via python.

We assume that you have the MSSQL configured in your EC2 instance. For this post I have configured it in standalone instance. but the Best practices is have standby or AlwaysOn Availability Groups configured for High availability. Also we assume that you have enabled the network communication between the Linux and Window box and attached IAM role with required permission to Linux box to talk to Secret Manager.

Create a secrets in Secret Manger #

We assume that you have access to create the secrets in Secret Manager and also you have AWS CLI access.

Step - 1 First, create a JSON file named db-credentials.json #

{
  "host": "your-mssql-server.example.com",
  "database": "your_database",
  "username": "your_username",
  "password": "your_password"
}

Step - 2 Then create the secret using the file #

aws secretsmanager create-secret \
    --name "mssql/credentials" \
    --description "MSSQL database credentials" \
    --secret-string file://db-credentials.json

Python script to take a backup #

The following python script will take a backup of table level in MSSQL server.

Note: Kindly supply the following details to make the script working as expect.

SECRET_ID
REGION
TABLE_NAMES
OUTPUT_DIR
SEPARATE_FILES
import boto3
import json
import pymssql
import os
from datetime import datetime

# Global variables
SECRET_ID = "your-secret-id"  # AWS Secrets Manager Secret ID
REGION = "<YOU_REGION>"          # AWS region for Secrets Manager
# List of tables to backup
TABLE_NAMES = [
    "table1", #Add you actual table name here.
    "table2",
    "table3"
    # Add more tables as needed
]
OUTPUT_DIR = "<OUTPUT_DIR>"      # Output directory for backup files
# Set to True to create one file per table, False to create a single file with all tables
SEPARATE_FILES = False

def get_secret(secret_id, region_name):
    """
    Retrieve secret from AWS Secrets Manager
    """
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_id)
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return json.loads(secret)
    except Exception as e:
        print(f"Error retrieving secret: {e}")
        raise

def backup_table(conn, cursor, table_name, output_file):
    """
    Backup a single table schema and data to a file
    """
    try:
        with open(output_file, 'a') as f:
            # Get table schema
            cursor.execute(f"SELECT OBJECT_DEFINITION(OBJECT_ID('{table_name}'))")
            schema = cursor.fetchone()[0]
            
            f.write(f"-- Table Schema for {table_name}\n")
            if schema:
                f.write(f"{schema}\n\n")
            else:
                # Alternative method to get schema
                cursor.execute(f"""
                    SELECT 'CREATE TABLE {table_name} (' +
                    STUFF((
                        SELECT ', ' + COLUMN_NAME + ' ' + 
                            DATA_TYPE + 
                            CASE 
                                WHEN CHARACTER_MAXIMUM_LENGTH IS NOT NULL 
                                THEN '(' + CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR) + ')' 
                                ELSE '' 
                            END +
                            CASE WHEN IS_NULLABLE = 'NO' THEN ' NOT NULL' ELSE ' NULL' END
                        FROM INFORMATION_SCHEMA.COLUMNS
                        WHERE TABLE_NAME = '{table_name}'
                        FOR XML PATH('')
                    ), 1, 2, '') + ');'
                """)
                schema_alt = cursor.fetchone()[0]
                f.write(f"{schema_alt}\n\n")
            
            # Get primary key info
            cursor.execute(f"""
                SELECT COLUMN_NAME
                FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
                WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_NAME), 'IsPrimaryKey') = 1
                AND TABLE_NAME = '{table_name}'
            """)
            pk_columns = [row[0] for row in cursor.fetchall()]
            
            if pk_columns:
                pk_constraint = f"ALTER TABLE {table_name} ADD CONSTRAINT PK_{table_name} PRIMARY KEY ({', '.join(pk_columns)});"
                f.write(f"-- Primary Key\n{pk_constraint}\n\n")
            
            # Get data
            cursor.execute(f"SELECT * FROM {table_name}")
            rows = cursor.fetchall()
            
            if rows:
                f.write(f"-- Data for {table_name}\n")
                
                # Get column names
                cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
                columns = [row[0] for row in cursor.fetchall()]
                
                # Write INSERT statements
                for row in rows:
                    values = []
                    for val in row:
                        if val is None:
                            values.append("NULL")
                        elif isinstance(val, str):
                            # Escape single quotes
                            escaped_val = val.replace("'", "''")
                            values.append(f"'{escaped_val}'")
                        elif isinstance(val, (datetime)):
                            values.append(f"'{val.strftime('%Y-%m-%d %H:%M:%S')}'")
                        else:
                            values.append(str(val))
                    
                    insert_stmt = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(values)});"
                    f.write(f"{insert_stmt}\n")
                
                f.write("\n")  # Add a blank line after table data
            
            return True
    
    except Exception as e:
        print(f"Error backing up table {table_name}: {e}")
        return False

def backup_tables(server, database, username, password, table_names, output_dir, separate_files):
    """
    Backup multiple tables schema and data
    """
    try:
        # Connect to the database
        conn = pymssql.connect(server=server, user=username, password=password, database=database)
        cursor = conn.cursor()
        
        # Create output directory if it doesn't exist
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        if separate_files:
            # Create a separate file for each table
            for table_name in table_names:
                output_file = os.path.join(output_dir, f"{table_name}_{timestamp}.sql")
                print(f"Backing up table {table_name}...")
                success = backup_table(conn, cursor, table_name, output_file)
                if success:
                    print(f"Successfully backed up table {table_name} to {output_file}")
                else:
                    print(f"Failed to back up table {table_name}")
        else:
            # Create a single file with all tables
            output_file = os.path.join(output_dir, f"all_tables_{timestamp}.sql")
            
            # Add a header to the file
            with open(output_file, 'w') as f:
                f.write(f"-- MSSQL Database Backup\n")
                f.write(f"-- Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
                f.write(f"-- Database: {database}\n")
                f.write(f"-- Tables: {', '.join(table_names)}\n\n")
            
            # Back up each table to the same file
            for table_name in table_names:
                print(f"Backing up table {table_name}...")
                success = backup_table(conn, cursor, table_name, output_file)
                if success:
                    print(f"Successfully backed up table {table_name}")
                else:
                    print(f"Failed to back up table {table_name}")
            
            print(f"All tables backed up to {output_file}")
        
        return True
    
    except Exception as e:
        print(f"Error during backup: {e}")
        return False
    finally:
        if 'conn' in locals() and conn:
            conn.close()

def main():
    try:
        # Get database credentials from Secrets Manager
        secret = get_secret(SECRET_ID, REGION)
        
        # Extract connection details from secret
        server = secret.get('host')
        database = secret.get('database')
        username = secret.get('username')
        password = secret.get('password')
        
        # Validate required secret values
        if not all([server, database, username, password]):
            missing = []
            if not server: missing.append('host')
            if not database: missing.append('database')
            if not username: missing.append('username')
            if not password: missing.append('password')
            print(f"Error: Missing required secret values: {', '.join(missing)}")
            return
        
        # Validate table names
        if not TABLE_NAMES:
            print("Error: No tables specified for backup")
            return
        
        # Perform backup
        backup_tables(server, database, username, password, TABLE_NAMES, OUTPUT_DIR, SEPARATE_FILES)
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

Python script to take a Restore #

The following python script will do the restore of tables in MSSQL server.

Note: Kindly supply the following details to make the script working as expect.

SECRET_ID
REGION
INPUT_FILE
EXECUTE_SQL  #Set to True to execute the SQL, False to just print it (dry run)
import boto3
import json
import pymssql
import os
import re
import argparse
from datetime import datetime

# Global variables
SECRET_ID = "your-secret-id"  # AWS Secrets Manager Secret ID
REGION = "<YOU_REGION>"          # AWS region for Secrets Manager
INPUT_FILE = "<YOU_BACKUP_FILE_NAME>"  # SQL file to restore
# Set to True to execute the SQL, False to just print it (dry run)
EXECUTE_SQL = True

def get_secret(secret_id, region_name):
    """
    Retrieve secret from AWS Secrets Manager
    """
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_id)
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return json.loads(secret)
    except Exception as e:
        print(f"Error retrieving secret: {e}")
        raise

def parse_sql_file(file_path):
    """
    Parse SQL file into separate statements
    """
    try:
        with open(file_path, 'r') as f:
            content = f.read()
        
        # Extract table names
        table_names = []
        table_pattern = re.compile(r'-- Table Schema for (\w+)')
        for match in table_pattern.finditer(content):
            table_names.append(match.group(1))
        
        # Split content into statements for each table
        tables_data = {}
        for i, table_name in enumerate(table_names):
            start_pattern = f"-- Table Schema for {table_name}"
            start_pos = content.find(start_pattern)
            
            # Find end position (either next table or end of file)
            if i < len(table_names) - 1:
                end_pattern = f"-- Table Schema for {table_names[i+1]}"
                end_pos = content.find(end_pattern)
            else:
                end_pos = len(content)
            
            # Extract table SQL
            table_sql = content[start_pos:end_pos].strip()
            tables_data[table_name] = table_sql
        
        return tables_data
    
    except Exception as e:
        print(f"Error parsing SQL file: {e}")
        raise

def extract_statements(table_sql):
    """
    Extract CREATE TABLE, ALTER TABLE, and INSERT statements from table SQL
    """
    # Extract CREATE TABLE statement
    create_pattern = re.compile(r'CREATE TABLE.*?;', re.DOTALL)
    create_match = create_pattern.search(table_sql)
    create_stmt = create_match.group(0) if create_match else None
    
    # Extract ALTER TABLE statement (primary key)
    alter_pattern = re.compile(r'ALTER TABLE.*?;', re.DOTALL)
    alter_match = alter_pattern.search(table_sql)
    alter_stmt = alter_match.group(0) if alter_match else None
    
    # Extract INSERT statements
    insert_pattern = re.compile(r'INSERT INTO.*?;', re.MULTILINE)
    insert_stmts = insert_pattern.findall(table_sql)
    
    return {
        'create': create_stmt,
        'alter': alter_stmt,
        'inserts': insert_stmts
    }

def restore_table(conn, cursor, table_name, statements):
    """
    Restore a table using the provided SQL statements
    """
    try:
        # Check if table exists
        cursor.execute(f"SELECT OBJECT_ID('{table_name}', 'U')")
        table_exists = cursor.fetchone()[0] is not None
        
        # Drop table if it exists
        if table_exists:
            print(f"Table {table_name} exists. Dropping it...")
            cursor.execute(f"DROP TABLE {table_name}")
            conn.commit()
        
        # Create table
        if statements['create']:
            print(f"Creating table {table_name}...")
            cursor.execute(statements['create'])
            conn.commit()
        else:
            print(f"Error: No CREATE TABLE statement found for {table_name}")
            return False
        
        # Add primary key
        if statements['alter']:
            print(f"Adding primary key to {table_name}...")
            cursor.execute(statements['alter'])
            conn.commit()
        
        # Insert data
        if statements['inserts']:
            print(f"Inserting {len(statements['inserts'])} rows into {table_name}...")
            for insert_stmt in statements['inserts']:
                cursor.execute(insert_stmt)
                # Process any info messages to avoid hanging
                while cursor.nextset():
                    pass
            conn.commit()
        
        print(f"Table {table_name} restored successfully.")
        return True
    
    except Exception as e:
        print(f"Error restoring table {table_name}: {e}")
        conn.rollback()
        return False

def restore_tables(server, database, username, password, input_file, execute_sql):
    """
    Restore tables from SQL file
    """
    try:
        # Parse SQL file
        print(f"Parsing SQL file: {input_file}")
        tables_data = parse_sql_file(input_file)
        
        if not tables_data:
            print("No tables found in the SQL file.")
            return False
        
        print(f"Found {len(tables_data)} tables: {', '.join(tables_data.keys())}")
        
        if execute_sql:
            # Connect to the database
            conn = pymssql.connect(server=server, user=username, password=password, database=database)
            cursor = conn.cursor()
            
            # Restore each table
            success_count = 0
            for table_name, table_sql in tables_data.items():
                print(f"\nProcessing table: {table_name}")
                statements = extract_statements(table_sql)
                if restore_table(conn, cursor, table_name, statements):
                    success_count += 1
            
            print(f"\nRestore completed. {success_count} of {len(tables_data)} tables restored successfully.")
            
            cursor.close()
            conn.close()
        else:
            # Dry run - just print the statements
            for table_name, table_sql in tables_data.items():
                print(f"\nStatements for table: {table_name}")
                statements = extract_statements(table_sql)
                
                print("\nCREATE TABLE statement:")
                print(statements['create'])
                
                if statements['alter']:
                    print("\nALTER TABLE statement:")
                    print(statements['alter'])
                
                print(f"\nINSERT statements: {len(statements['inserts'])} total")
                if statements['inserts']:
                    print("First INSERT statement:")
                    print(statements['inserts'][0])
                    if len(statements['inserts']) > 1:
                        print("...")
        
        return True
    
    except Exception as e:
        print(f"Error during restore: {e}")
        return False

def main():
    try:
        # Get database credentials from Secrets Manager
        secret = get_secret(SECRET_ID, REGION)
        
        # Extract connection details from secret
        server = secret.get('host')
        database = secret.get('database')
        username = secret.get('username')
        password = secret.get('password')
        
        # Validate required secret values
        if not all([server, database, username, password]):
            missing = []
            if not server: missing.append('host')
            if not database: missing.append('database')
            if not username: missing.append('username')
            if not password: missing.append('password')
            print(f"Error: Missing required secret values: {', '.join(missing)}")
            return
        
        # Validate input file
        if not os.path.exists(INPUT_FILE):
            print(f"Error: Input file not found: {INPUT_FILE}")
            return
        
        # Perform restore
        restore_tables(server, database, username, password, INPUT_FILE, EXECUTE_SQL)
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

Hope this blog help you to take a backup and restore the data of the MSSQL at table leave. Happy Automation 🤗