经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MongoDB » 查看文章
用Python脚本迁移MongoDB数据到金仓-kingbase数据库 - 惟-koko
来源:cnblogs  作者:惟-koko  时间:2024/5/31 20:33:42  对本文有异议

1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表;

此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档)

所以在金仓里创建表 create table admin(id varchar,image bytea);

2、安装Python环境,由于是内网环境,没有yum源,需要从能连接互联网的环境下载好相应的安装包

Python:3.9.0版本

用到以下这些包

import pymongo
import ksycopg2
import concurrent.futures
from ksycopg2 import pool
import logging
from urllib.parse import quote_plus

------------------------------------------------------------------------------------

pip download pymongo -d pymongo_packages --下载pymongo库

pip3 install --no-index --find-links=. pymongo --安装pymongo库

金仓的Python驱动可以到金仓官网下载,需要找和Python对应的版本

以下是Python脚本内容:

  1. import pymongo
  2. import psycopg2
  3. import concurrent.futures
  4. from psycopg2 import pool
  5. import logging
  6. from urllib.parse import quote_plus
  7. import os
  8. # 初始化日志记录
  9. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
  10. # MongoDB设置
  11. username='admin'
  12. password='SCJGscjg@123'
  13. host='10.253.228.41'
  14. port='27017'
  15. encoded_username = quote_plus(username)
  16. encoded_password = quote_plus(password)
  17. uri = f"mongodb://{encoded_username}:{encoded_password}@{host}:{port}/"
  18. mongo_client = pymongo.MongoClient(uri)
  19. mongo_db = mongo_client['admin']
  20. mongo_collection = mongo_db['admin']
  21. # 连接池设置
  22. kb_pool = psycopg2.pool.ThreadedConnectionPool(
  23. minconn=1,
  24. maxconn=20,
  25. host="10.253.228.110",
  26. database="mongo",
  27. user="system",
  28. password="1",
  29. port="54322"
  30. )
  31. # 偏移量存储文件
  32. OFFSET_FILE = 'offset.txt'
  33.  
  34. def read_offset():
  35. if os.path.exists(OFFSET_FILE):
  36. with open(OFFSET_FILE, 'r') as f:
  37. return int(f.read().strip())
  38. return 0
  39. def write_offset(offset):
  40. with open(OFFSET_FILE, 'w') as f:
  41. f.write(str(offset))
  42. def batch_insert(mongo_data):
  43. kb_conn = None
  44. try:
  45. kb_conn = kb_pool.getconn()
  46. with kb_conn.cursor() as kb_cursor:
  47. for data in mongo_data:
  48. id_value = data['_id']
  49. image_data = data['image']
  50. insert_query = "INSERT INTO dzzzwj(id, image) VALUES (%s, %s)"
  51. kb_cursor.execute(insert_query, (id_value, image_data))
  52. kb_conn.commit()
  53. return True
  54. except Exception as e:
  55. logging.error(f"批量插入错误: {e}")
  56. return False
  57. finally:
  58. if kb_conn:
  59. kb_pool.putconn(kb_conn)
  60. def main():
  61. batch_size = 80
  62. offset = read_offset()
  63. executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
  64. try:
  65. while True:
  66. mongo_data = list(mongo_collection.find().skip(offset).limit(batch_size))
  67. if not mongo_data:
  68. break
  69. future = executor.submit(batch_insert, mongo_data)
  70. future.add_done_callback(lambda f, offset=offset: (
  71. logging.info(f"Batch completed with offset {offset}") if f.result() else logging.error(f"Batch failed with offset {offset}"),
  72. write_offset(offset + batch_size) if f.result() else None
  73. ))
  74. offset += batch_size if future.result() else 0
  75. except Exception as e:
  76. logging.error(f"主循环错误: {e}")
  77. finally:
  78. executor.shutdown(wait=True)
  79. mongo_client.close()
  80. kb_pool.closeall()
  81. logging.info("资源已清理完毕。")
  82. if __name__ == "__main__":
  83. main()

这段代码思路:

(1)连接MongoDB和kingbase数据;

(2)因为MongoDB数据量比较大,并且需要断点续传,索引用了分页和排序;

(3)数据成功插入金仓数据库后,增加偏移量,并且将当前偏移量记录在offset.txt里面,以便脚本停了,可以再重启接着迁数据;

因为二进制数据从MongoDB和金仓数据查询出来的内容看着不一样,所以下面的代码是计算两边数据md5值对比的简单代码

  1. import pymongo
  2. import ksycopg2
  3. import base64
  4. import hashlib
  5. def compute_hash(data):
  6. return hashlib.md5(data).hexdigest()
  7. mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
  8. mongo_db = mongo_client['admin']
  9. mongo_collection = mongo_db['mongodb']
  10. database = "test"
  11. user = "system"
  12. password = "1"
  13. host = "127.0.0.1"
  14. port = "54322"
  15. conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)
  16. cursor = conn.cursor()
  17. mongo_data = mongo_collection.find()
  18. print(mongo_data)
  19. # 插入到 kingbase
  20. for data in mongo_data:
  21. id_value = data['_id']
  22. image_data = data['image']
  23. #image_data = base64.b64encode(base64_data).decode('utf-8')
  24. image_data_byte = image_data
  25. if isinstance(image_data, bytes):
  26. mongo_hash = compute_hash(image_data_byte)
  27. print(mongo_hash)
  28. #image_data = base64.b64encode(base64_data).decode('utf-8')
  29. if id_value and image_data:
  30. insert_query = "INSERT INTO zzwj(_id, image) VALUES (%s, %s)"
  31. cursor.execute(insert_query, (id_value, image_data))
  32. # 提交事务
  33. conn.commit()
  34. cursor.execute("select _id, image from zzwj")
  35. rows = cursor.fetchall()
  36. for row in rows:
  37. _id = row[0]
  38. image_byte = row[1]
  39. pg_hash = compute_hash(image_byte)
  40. print(pg_hash)
  41. # 关闭连接
  42. cursor.close()
  43. conn.close()
  44. mongo_client.close()

 

原文链接:https://www.cnblogs.com/weisu-koko/p/18225227

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号