1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表;
此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档)
所以在金仓里创建表 create table admin(id varchar,image bytea);
2、安装Python环境,由于是内网环境,没有yum源,需要从能连接互联网的环境下载好相应的安装包
Python:3.9.0版本
用到以下这些包
import pymongo
import ksycopg2
import concurrent.futures
from ksycopg2 import pool
import logging
from urllib.parse import quote_plus
------------------------------------------------------------------------------------
pip download pymongo -d pymongo_packages --下载pymongo库
pip3 install --no-index --find-links=. pymongo --安装pymongo库
金仓的Python驱动可以到金仓官网下载,需要找和Python对应的版本
以下是Python脚本内容:
- import pymongo
- import psycopg2
- import concurrent.futures
- from psycopg2 import pool
- import logging
- from urllib.parse import quote_plus
- import os
- # 初始化日志记录
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
- # MongoDB设置
- username='admin'
- password='SCJGscjg@123'
- host='10.253.228.41'
- port='27017'
- encoded_username = quote_plus(username)
- encoded_password = quote_plus(password)
- uri = f"mongodb://{encoded_username}:{encoded_password}@{host}:{port}/"
- mongo_client = pymongo.MongoClient(uri)
- mongo_db = mongo_client['admin']
- mongo_collection = mongo_db['admin']
- # 连接池设置
- kb_pool = psycopg2.pool.ThreadedConnectionPool(
- minconn=1,
- maxconn=20,
- host="10.253.228.110",
- database="mongo",
- user="system",
- password="1",
- port="54322"
- )
- # 偏移量存储文件
- OFFSET_FILE = 'offset.txt'
-
- def read_offset():
- if os.path.exists(OFFSET_FILE):
- with open(OFFSET_FILE, 'r') as f:
- return int(f.read().strip())
- return 0
- def write_offset(offset):
- with open(OFFSET_FILE, 'w') as f:
- f.write(str(offset))
- def batch_insert(mongo_data):
- kb_conn = None
- try:
- kb_conn = kb_pool.getconn()
- with kb_conn.cursor() as kb_cursor:
- for data in mongo_data:
- id_value = data['_id']
- image_data = data['image']
- insert_query = "INSERT INTO dzzzwj(id, image) VALUES (%s, %s)"
- kb_cursor.execute(insert_query, (id_value, image_data))
- kb_conn.commit()
- return True
- except Exception as e:
- logging.error(f"批量插入错误: {e}")
- return False
- finally:
- if kb_conn:
- kb_pool.putconn(kb_conn)
- def main():
- batch_size = 80
- offset = read_offset()
- executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
-
- try:
- while True:
- mongo_data = list(mongo_collection.find().skip(offset).limit(batch_size))
- if not mongo_data:
- break
- future = executor.submit(batch_insert, mongo_data)
- future.add_done_callback(lambda f, offset=offset: (
- logging.info(f"Batch completed with offset {offset}") if f.result() else logging.error(f"Batch failed with offset {offset}"),
- write_offset(offset + batch_size) if f.result() else None
- ))
- offset += batch_size if future.result() else 0
- except Exception as e:
- logging.error(f"主循环错误: {e}")
- finally:
- executor.shutdown(wait=True)
- mongo_client.close()
- kb_pool.closeall()
- logging.info("资源已清理完毕。")
- if __name__ == "__main__":
- main()
这段代码思路:
(1)连接MongoDB和kingbase数据;
(2)因为MongoDB数据量比较大,并且需要断点续传,索引用了分页和排序;
(3)数据成功插入金仓数据库后,增加偏移量,并且将当前偏移量记录在offset.txt里面,以便脚本停了,可以再重启接着迁数据;
因为二进制数据从MongoDB和金仓数据查询出来的内容看着不一样,所以下面的代码是计算两边数据md5值对比的简单代码
- import pymongo
- import ksycopg2
- import base64
- import hashlib
- def compute_hash(data):
- return hashlib.md5(data).hexdigest()
- mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
- mongo_db = mongo_client['admin']
- mongo_collection = mongo_db['mongodb']
- database = "test"
- user = "system"
- password = "1"
- host = "127.0.0.1"
- port = "54322"
- conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)
- cursor = conn.cursor()
- mongo_data = mongo_collection.find()
- print(mongo_data)
- # 插入到 kingbase
- for data in mongo_data:
- id_value = data['_id']
- image_data = data['image']
- #image_data = base64.b64encode(base64_data).decode('utf-8')
- image_data_byte = image_data
- if isinstance(image_data, bytes):
- mongo_hash = compute_hash(image_data_byte)
- print(mongo_hash)
- #image_data = base64.b64encode(base64_data).decode('utf-8')
- if id_value and image_data:
- insert_query = "INSERT INTO zzwj(_id, image) VALUES (%s, %s)"
- cursor.execute(insert_query, (id_value, image_data))
- # 提交事务
- conn.commit()
- cursor.execute("select _id, image from zzwj")
- rows = cursor.fetchall()
- for row in rows:
- _id = row[0]
- image_byte = row[1]
-
- pg_hash = compute_hash(image_byte)
- print(pg_hash)
- # 关闭连接
- cursor.close()
- conn.close()
- mongo_client.close()