什么是ArcticDB
简介
ArcticDB,由英仕曼(ManGroup)开源的,Built for speed,Designed for Quants,的数据库。
ArcticDB,是一款很"年轻"的数据库,第一个正式版,发布于2023年5月。
英仕曼(ManGroup),全球最大的上市对冲基金公司。
特点
没有服务端
之前我们讨论的MySQL、MongoDB等数据库,都有服务端和客户端的概念。
例如:在服务器上安装的MySQL实例和基于JDBC协议的MySQL驱动Jar包、在服务器上安装的MongoDB和MongoDB官方提供的Python包pymongo。
因为ArcticDB没有服务端,所以有些观点认为,ArcticDB不是数据库,而是文件系统。
性能优异
能够存储、检索和处理大规模的数据。
灵活
不需要数据模式(表结构)。
支持流式数据摄取。
双时态,除了每张表可以有时序索引,还存储表的之前版本的数据。
ArcticDB不支持事务。
当然不支持事务,这个数据库没有服务端,只有客户端。而事务是要基于锁实现的,并且一定是服务端上锁。
安装
正如上文所述,ArcticDB没有服务端,只有客户端。
只需要安装客户端,并且目前只有Python版本的客户端,安装命令如下:
截至2024年12月,ArcticDB,只支持Windows和Linux系统,不支持MacOS系统。
存储
分类
根据ArcticDB官方的说法,ArcticDB支持四种存储方式:
本地存储
基于LMDB协议
S31 2 3 ac = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET' )
S3是一种传输存储协议,不仅仅AmazonS3支持,阿里云的OSS、腾讯云的COS等,也支持S3协议。
Azure1 ac = adb.Arctic('azure://CA_cert_path=/etc/ssl/certs/ca-certificates.crt;BlobEndpoint=https://arctic.blob.core.windows.net;Container=acblob;SharedAccessSignature=sp=sig' )
内存1 ac_mem = adb.Arctic('mem://' )
但是,根据ArcticDB的源码,也支持MongoDB。
在ArcticDB/python/arcticdb/arctic.py
,我们会看到如下内容,其中有MongoLibraryAdapter
。
ArcticDB的上一代是Arctic,Arctic是一款依赖于MongoDB的数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 【部分代码略】 class Arctic : """ Top-level library management class. Arctic instances can be configured against an S3 environment and enable the creation, deletion and retrieval of Arctic libraries. """ _LIBRARY_ADAPTERS = [ S3LibraryAdapter, LMDBLibraryAdapter, AzureLibraryAdapter, MongoLibraryAdapter, InMemoryLibraryAdapter, ] 【部分代码略】
本地存储
示例代码:
1 2 3 4 5 6 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) print(ac)
运行结果:
1 Arctic(config=LMDB(path=D:\ad\demo))
特别的,我们会看到新建了文件夹demo
。
库
常用操作:
创建库:create_library
列出所有的库:list_libraries
获取库:get_library
删除库:delete_library
是否有某个库:has_library
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" print("创建前:" , ac.list_libraries()) cl = ac.create_library(name=library_name) print("创建:" , cl) print("创建后:" , ac.list_libraries()) print("获取库:" , ac.get_library(name=library_name))
运行结果:
1 2 3 4 创建前: [] 创建: Library(Arctic(config=LMDB(path=D:\ad\demo)), path=demo_db, storage=lmdb_storage) 创建后: ['demo_db'] 获取库: Library(Arctic(config=LMDB(path=D:\ad\demo)), path=demo_db, storage=lmdb_storage)
读写
方法
方法:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import arcticdb as adbimport pandas as pdimport numpy as npfrom datetime import datetimeuri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) cols = ['COL_%d' % i for i in range(50 )] df = pd.DataFrame(np.random.randint(0 , 50 , size=(25 , 50 )), columns=cols) df.index = pd.date_range(datetime(2024 , 1 , 1 , 5 ), periods=25 , freq="h" ) print(df.head(5 )) write_info = library.write('test_frame' , df) print('write_info:' , write_info) read_info = library.read('test_frame' ) print('read_info:' , read_info) read_data = read_info.data print(read_data.head(5 ))
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36 2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28 2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26 2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33 2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26 [5 rows x 50 columns] write_info: VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=0, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735295749390643600) read_info: VersionedItem(symbol='test_frame', library='demo_db', data=<class 'pandas.core.frame.DataFrame'>, version=0, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735295749390643600) COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36 2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28 2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26 2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33 2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26 [5 rows x 50 columns]
解释说明:
write
方法和read
方法的返回,都有一个字段version
,版本号。
获取读取到的数据,需要data
属性。1 library.read('test_frame' ).data
查看Symbol(表)
library.list_symbols()
,列出当前库中所有的Symbol(表)。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) print(library.list_symbols())
运行结果:
切片
date_range
date_range
,read
方法的一个参数,用于指定时间范围。
其特点有:
只对time-indexed的DataFrame生效
左右两个参数都是闭区间。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from datetime import datetimeimport arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) start_date = datetime.strptime('2024-01-01 05:00:00' , '%Y-%m-%d %H:%M:%S' ) end_date = datetime.strptime('2024-01-01 09:00:00' , '%Y-%m-%d %H:%M:%S' ) df = library.read('test_frame' , date_range=(start_date,end_date)).data print(df)
运行结果:
1 2 3 4 5 6 7 8 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36 2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28 2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26 2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33 2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26 [5 rows x 50 columns]
row_range
row_range
,read
方法的一个参数,用于指定行。
其特点有:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) df = library.read('test_frame' , row_range=(0 ,5 )).data print(df)
运行结果:
1 2 3 4 5 6 7 8 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36 2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28 2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26 2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33 2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26 [5 rows x 50 columns]
解释说明:读取第0行到第5行,不包括第5行;即左闭右开。
columns
columns
,read
方法的一个参数,用于指定列。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 import arcticdb as adburi = "lmdb://tmp/arcticdb_intro" ac = adb.Arctic(uri) library_name = "my_library_name" library = ac.get_library(library_name) df = library.read('test_frame' , row_range=(0 ,5 ), columns=['COL_0' , 'COL_1' ]).data print(df)
运行结果:
1 2 3 4 5 6 COL_0 COL_1 2024-01-01 05:00:00 5 15 2024-01-01 06:00:00 30 12 2024-01-01 07:00:00 38 37 2024-01-01 08:00:00 9 19 2024-01-01 09:00:00 43 45
lazy
lazy
参数,read
方法的一个参数,用于控制查询的执行,默认为False
。
当lazy=True
时,查询不会立即执行,而是被推迟到调用LazyDataFrame
对象上的collect
方法时才执行。
基于此,我们可以对LazyDataFrame
对象构建一系列转换和过滤操作,同时不必在每个步骤中都触发实际的数据处理,从而提高性能。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) lazy_df = library.read('test_frame' , row_range=(0 ,5 ), columns=['COL_30' , 'COL_31' ], lazy=True ) lazy_df = lazy_df[(lazy_df["COL_30" ] > 10 ) & (lazy_df["COL_31" ] < 100 )] print(lazy_df) df = lazy_df.collect().data print(df) tdf = library.read('test_frame' , columns=['COL_30' , 'COL_31' ]).data tdf = tdf[(tdf["COL_30" ] > 10 ) & (tdf["COL_31" ] < 100 )] print(tdf.shape)
运行结果:
1 2 3 4 5 6 LazyDataFrame(ReadRequest(symbol=test_frame, row_range=(0, 5), columns=['COL_30', 'COL_31']) | WHERE ((Column["COL_30"] GT Num(10)) AND (Column["COL_31"] LT Num(100)))) COL_30 COL_31 2024-01-01 06:00:00 20 0 2024-01-01 07:00:00 20 24 2024-01-01 08:00:00 34 36 2024-01-01 09:00:00 38 12
解释说明:
lazy_df
只是一个LazyDataFrame
对象,只有执行collect()
,才会真正去查询数据。
特别的,我们查询了一个tdf
,发现符合(COL_30 > 10) & (COL_31 < 100)
的数据不止4条。
也就是说,上文的row_range
,并不是SQL中的limit的作用,几个条件之间有先后顺序
的且
的关系。
如果我们查询(COL_30 > 10) & (COL_31 < 100)
的前5行,可以这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) lazy_df = library.read('test_frame' , columns=['COL_30' , 'COL_31' ], lazy=True ) lazy_df = lazy_df[(lazy_df["COL_30" ] > 10 ) & (lazy_df["COL_31" ] < 100 )] lazy_df = lazy_df.head(5 ) print(lazy_df) df = lazy_df.collect().data print(df) tdf = library.read('test_frame' , columns=['COL_30' , 'COL_31' ]).data tdf = tdf[(tdf["COL_30" ] > 10 ) & (tdf["COL_31" ] < 100 )] print(tdf.shape)
运行结果:
1 2 3 4 5 6 7 8 LazyDataFrame(ReadRequest(symbol=test_frame, columns=['COL_30', 'COL_31']) | WHERE ((Column["COL_30"] GT Num(10)) AND (Column["COL_31"] LT Num(100))) | ROWRANGE: HEAD, n=5) COL_30 COL_31 2024-01-01 06:00:00 20 0 2024-01-01 07:00:00 20 24 2024-01-01 08:00:00 34 36 2024-01-01 09:00:00 38 12 2024-01-01 10:00:00 31 39 (18, 2)
修改、版本控制
append
append
,Library
对象的方法,对指定的Symbol
(表),追加数据。
其特点有:
追加操作始终沿索引进行。
每次追加都会创建一个新版本来引用新追加的数据。
追加仅接受其第一行索引等于或大于现有数据最后一行索引的数据。
在下文的例子中:最后一行的索引是2024-01-02 05:00:00
,我们添加索引大于2024-01-02 05:00:00
的数据会成功。如果我们添加索引不大于2024-01-02 05:00:00
会失败,具体可以自己试一试。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from datetime import datetimeimport arcticdb as adbimport numpy as npimport pandas as pduri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) print(library.tail('test_frame' , 5 ).data) random_data = np.random.randint(0 , 50 , size=(3 , 50 )) df_append = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50 )]) df_append.index = pd.date_range(datetime(2024 , 12 , 1 , 7 ), periods=3 , freq="h" ) append_info = library.append('test_frame' , df_append) print(append_info) print(library.tail('test_frame' , 5 ).data)
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-02 01:00:00 39 8 7 24 ... 9 2 26 20 2024-01-02 02:00:00 48 40 35 40 ... 14 45 27 38 2024-01-02 03:00:00 11 22 32 23 ... 41 14 22 24 2024-01-02 04:00:00 36 49 5 14 ... 19 4 7 46 2024-01-02 05:00:00 8 35 31 2 ... 33 29 31 12 [5 rows x 50 columns] VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=1, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735297630804373900) COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-02 04:00:00 36 49 5 14 ... 19 4 7 46 2024-01-02 05:00:00 8 35 31 2 ... 33 29 31 12 2024-12-01 07:00:00 1 45 37 42 ... 1 33 35 16 2024-12-01 08:00:00 49 47 25 1 ... 38 9 35 21 2024-12-01 09:00:00 18 38 45 41 ... 1 47 25 46 [5 rows x 50 columns]
update
update
,Library
对象的方法,更新指定的Symbol
(表)的数据。
其特点有:
从第一个索引条目到最后一个索引条目的整个范围将被入参的内容完全替换。
注意是整个范围,所以可能会删除某个索引条目,也可能会添加某个的索引条目。
必须是time-indexed的DataFrame。
如果data
没有row数据,则不会执行任何操作,也不会创建新版本。
所以,不能使用update
和date_range
仅删除数据的一部分。(可以使用delete_data_in_range
方法)。
在下文的代码中,我们用一个索引分别是2024-01-01 05:00:00
、2024-01-01 07:00:00
和2024-01-01 09:00:00
的DataFrame去更新原来的DataFrame,发现,从第一个索引2024-01-01 05:00:00
到最后一个2024-01-01 09:00:00
的数据都被更新了,并且原来的2024-01-01 06:00:00
和2024-01-01 08:00:00
被删除了。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from datetime import datetimeimport arcticdb as adbimport numpy as npimport pandas as pduri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) print(library.head('test_frame' , 5 ).data) random_data = np.random.randint(0 , 50 , size=(3 , 50 )) df_update = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50 )]) df_update.index = pd.date_range(datetime(2024 , 1 , 1 , 5 ), periods=3 , freq="2h" ) print(df_update) update_info = library.update('test_frame' , df_update) print(update_info) print(library.head('test_frame' , 5 ).data)
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 7 46 17 9 ... 48 7 0 4 2024-01-01 06:00:00 17 30 21 44 ... 28 30 6 12 2024-01-01 07:00:00 27 48 34 29 ... 45 30 39 36 2024-01-01 08:00:00 1 46 31 49 ... 7 14 27 36 2024-01-01 09:00:00 34 48 23 8 ... 15 32 33 5 [5 rows x 50 columns] COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10 2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8 2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23 [3 rows x 50 columns] VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=1, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735298719295936200) COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10 2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8 2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23 2024-01-01 10:00:00 10 37 1 40 ... 47 6 11 2 2024-01-01 11:00:00 15 0 19 35 ... 12 43 27 19 [5 rows x 50 columns]
版本控制
版本控制,也有资料称之为时间旅行。
在上文,我们已经发现了,VersionedItem其中一个属性是version
,版本号。
我们可以基于as_of
参数,指定版本号。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) print(library.head('test_frame' , 5 , as_of=0 ).data) print(library.head('test_frame' , 5 , as_of=1 ).data)
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 7 46 17 9 ... 48 7 0 4 2024-01-01 06:00:00 17 30 21 44 ... 28 30 6 12 2024-01-01 07:00:00 27 48 34 29 ... 45 30 39 36 2024-01-01 08:00:00 1 46 31 49 ... 7 14 27 36 2024-01-01 09:00:00 34 48 23 8 ... 15 32 33 5 [5 rows x 50 columns] COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49 2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10 2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8 2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23 2024-01-01 10:00:00 10 37 1 40 ... 47 6 11 2 2024-01-01 11:00:00 15 0 19 35 ... 12 43 27 19 [5 rows x 50 columns]
特别的,默认情况下,写入、追加和更新操作不会移除之前的版本,所以这将会占用更多的空间。
我们可以在写入、追加或者更新的时候,通过prune_previous_versions
参数定义是否移除之前的版本。
索引
分类
四种索引
ArcticDB支持以下索引类型:
Int64Index或UInt64Index
RangeIndex
DatetimeIndex
由上述支持类型的组成的MultiIndex
Int64Index
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 import pandas as pdindex = pd.Index([10 , 20 , 30 ], name='example_index' ) df_int64 = pd.DataFrame({ 'A' : ['foo' , 'bar' , 'baz' ], 'B' : [1 , 2 , 3 ] }, index=index) print(df_int64)
运行结果:
1 2 3 4 5 A B example_index 10 foo 1 20 bar 2 30 baz 3
DatetimeIndex
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pandas as pddates = pd.date_range(start="2023-01-01" , periods=3 , freq='D' ) index = pd.DatetimeIndex(dates, name='date' ) df_datetime = pd.DataFrame({ 'temperature' : [22.5 , 24.0 , 23.0 ], 'humidity' : [55 , 60 , 65 ] }, index=index) print("\nDatetimeIndex DataFrame:" ) print(df_datetime)
运行结果:
1 2 3 4 5 6 DatetimeIndex DataFrame: temperature humidity date 2023-01-01 22.5 55 2023-01-02 24.0 60 2023-01-03 23.0 65
RangeIndex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import pandas as pddf_range = pd.DataFrame({ 'values' : [100 , 200 , 300 ] }) print("RangeIndex DataFrame (default):" ) print(df_range) range_index = pd.RangeIndex(start=10 , stop=13 , step=1 , name='sequence' ) df_explicit_range = pd.DataFrame({ 'letters' : ['x' , 'y' , 'z' ] }, index=range_index) print("Explicit RangeIndex DataFrame:" ) print(df_explicit_range)
输出将是:
1 2 3 4 5 6 7 8 9 10 11 RangeIndex DataFrame (default): values 0 100 1 200 2 300 Explicit RangeIndex DataFrame: letters sequence 10 x 11 y 12 z
MultiIndex
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import pandas as pdarrays = [ [1 , 1 , 2 , 2 ], pd.date_range('2023-01-01' , periods=4 ) ] multi_index = pd.MultiIndex.from_arrays(arrays, names=('group' , 'date' )) df_multi = pd.DataFrame({ 'sales' : [1000 , 1500 , 2000 , 2500 ] }, index=multi_index) print("MultiIndex DataFrame:" ) print(df_multi)
运行结果:
1 2 3 4 5 6 7 8 9 MultiIndex DataFrame: sales group date 1 2023-01-01 1000 2023-01-02 1500 2 2023-01-03 2000 2023-01-04 2500 Process finished with exit code 0
自动创建
在ArcticDB中创建索引的方式与传统关系型数据库不同,不需要我们手动创建索引,也没有所谓的创建索引的语句或方法,而是根据我们传入的DataFrame,自动创建。
并行写入
并行写入,也被称为分阶段写入,是指我们并发的往一个Symbol(表)中写入数据。
其特点有:
写入的时候调用write
方法,传入参数staged
,值为True
。
写入完成后,必须调用library.finalize_staged_data()
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import arcticdb as adbimport pandas as pdimport numpy as npfrom datetime import datetimefrom joblib import Parallel, delayeduri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "demo_db_parallel" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) cols = ['COL_%d' % i for i in range(50 )] df = pd.DataFrame(np.random.randint(0 , 50 , size=(25 , 50 )), columns=cols) df.index = pd.date_range(datetime(2024 , 1 , 1 , 5 ), periods=25 , freq="h" ) def write_chunk (chunk_df, chunk_id) : print(chunk_id) library.write('test_df' , chunk_df, staged=True ) def parallel_write (df_val, n_jobs=-1 ) : chunks = [df_val[i:i+5 ] for i in range(0 , len(df_val), 5 )] results = Parallel(n_jobs=n_jobs)( delayed(write_chunk)(chunk, idx) for idx, chunk in enumerate(chunks) ) return results parallel_write(df) library.finalize_staged_data('test_df' ) print(library.read('test_df' )) print(library.read('test_df' ).data)
快照
什么是快照
快照,Snapshot,保存数据的一个固定版本,将来可以恢复到这个版本。
其特点有:
创建快照是一个轻量级的操作,不会影响正在进行的读取或写入操作。
当有多个快照时候,第二个快照只存储与上一个快照相比发生变化的数据,相对节省存储空间。
操作
创建快照
snapshot
,Library
对象的方法,创建快照。
有四个参数:
snapshot_name
metadata
元数据
skip_symbols
,List[str]
,跳过的Symbol(表)
versions
,Dict[str, int]
,例如versions={"a": 2, "b": 3}
,快照2版本的a
、3版本的b
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import arcticdb as adbimport pandas as pdimport numpy as npfrom datetime import datetimeuri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "sdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) cols = ['COL_%d' % i for i in range(10 )] df = pd.DataFrame(np.random.randint(0 , 50 , size=(10 , 10 )), columns=cols) df.index = pd.date_range(datetime(2024 , 1 , 1 , 5 ), periods=10 , freq="h" ) library.write('test_frame' , df) library.snapshot("snapshot_name" )
列出所有快照
list_snapshots
,Library
对象的方法,其中一个参数是load_metadata
,决定了是否加载每个快照的元数据(metadata)。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "sdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) snapshot_list = library.list_snapshots() print(snapshot_list)
运行结果:
解释说明:本文我们没有定义快照的元数据。
读取快照
基于read
的as_of
参数指定快照。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "sdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) library.delete('test_frame' ) try : print(library.read(symbol='test_frame' ).data) except Exception: print(Exception) print(library.read(symbol='test_frame' ,as_of='snapshot_name' ).data)
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 20241227 22:42:46.889488 19816 W arcticdb | Nothing to delete for symbol 'test_frame' <class 'Exception'> COL_0 COL_1 COL_2 COL_3 ... COL_6 COL_7 COL_8 COL_9 2024-01-01 05:00:00 16 44 26 16 ... 29 40 34 45 2024-01-01 06:00:00 6 43 25 14 ... 16 24 33 38 2024-01-01 07:00:00 35 17 22 49 ... 5 22 33 28 2024-01-01 08:00:00 20 24 29 13 ... 45 10 45 14 2024-01-01 09:00:00 21 41 35 3 ... 32 7 4 41 2024-01-01 10:00:00 29 42 45 0 ... 6 14 15 38 2024-01-01 11:00:00 27 42 38 15 ... 38 4 9 42 2024-01-01 12:00:00 23 3 5 8 ... 47 6 22 2 2024-01-01 13:00:00 20 35 4 11 ... 7 6 22 45 2024-01-01 14:00:00 18 11 16 13 ... 47 23 41 5 [10 rows x 10 columns]
删除快照
delete_snapshot
,Library
对象的方法,删除快照。
元数据
什么是元数据
元数据,Metadata,辅助信息,元数据可以用来描述数据的结构、内容、质量以及其他属性,帮助用户更好地理解和处理这些数据。
操作
添加元数据
两种方法:
write
写入数据时,可以通过metadata
参数传递一个字典形式的元数据。
write_metadata
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from datetime import datetimeimport arcticdb as adbimport numpy as npimport pandas as pduri = "lmdb://demo" ac = adb.Arctic(uri) library_name = "mdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) cols = ['COL_%d' % i for i in range(10 )] df = pd.DataFrame(np.random.randint(0 , 50 , size=(10 , 10 )), columns=cols) df.index = pd.date_range(datetime(2024 , 1 , 1 , 5 ), periods=10 , freq="h" ) metadata = { 'created_by' : 'data_engineer' , 'description' : 'Test dataset for demonstration purposes.' , 'version' : '1.0' } library.write('test_symbol' , df, metadata=metadata)
获取元数据
两种方法:
library.read('test_symbol').metadata
library.read_metadata('test_symbol')
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "mdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) print(library.read('test_symbol' ).metadata) print(library.read_metadata('test_symbol' ))
更新元数据
再写一遍,即可更新。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import arcticdb as adburi = "lmdb://demo" ac = adb.Arctic(uri) library_name = "mdb" if ac.has_library(library_name): library = ac.get_library(name=library_name) else : library = ac.create_library(name=library_name) new_metadata = { 'created_by' : 'data_engineer' , 'description' : 'Updated test dataset with more information.' , 'version' : '2.0' } library.write_metadata(symbol='test_symbol' ,metadata=new_metadata) print(library.read_metadata(symbol='test_symbol' ))