avatar


ArcticDB (为量化而设计的数据库)

什么是ArcticDB

简介

ArcticDB,由英仕曼(ManGroup)开源的,Built for speed,Designed for Quants,的数据库。

ArcticDB,是一款很"年轻"的数据库,第一个正式版,发布于2023年5月。

英仕曼(ManGroup),全球最大的上市对冲基金公司。

特点

  • 没有服务端
    • 之前我们讨论的MySQL、MongoDB等数据库,都有服务端和客户端的概念。
      例如:在服务器上安装的MySQL实例和基于JDBC协议的MySQL驱动Jar包、在服务器上安装的MongoDB和MongoDB官方提供的Python包pymongo。
    • 因为ArcticDB没有服务端,所以有些观点认为,ArcticDB不是数据库,而是文件系统。
  • 性能优异
    能够存储、检索和处理大规模的数据。
  • 灵活
    • 不需要数据模式(表结构)。
    • 支持流式数据摄取。
    • 双时态,除了每张表可以有时序索引,还存储表的之前版本的数据。

ArcticDB不支持事务。

当然不支持事务,这个数据库没有服务端,只有客户端。而事务是要基于锁实现的,并且一定是服务端上锁。

安装

正如上文所述,ArcticDB没有服务端,只有客户端。

只需要安装客户端,并且目前只有Python版本的客户端,安装命令如下:

1
pip install arcticdb

截至2024年12月,ArcticDB,只支持Windows和Linux系统,不支持MacOS系统。

存储

分类

根据ArcticDB官方的说法,ArcticDB支持四种存储方式:

  1. 本地存储
    基于LMDB协议
  2. S3
    1
    2
    3
    # Leave AWS to derive credential information
    ac = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET')
    # Manually specify credsac = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET?region=YOUR_REGION&access=ABCD&secret=DCBA')
    S3是一种传输存储协议,不仅仅AmazonS3支持,阿里云的OSS、腾讯云的COS等,也支持S3协议。
  3. Azure
    1
    ac = adb.Arctic('azure://CA_cert_path=/etc/ssl/certs/ca-certificates.crt;BlobEndpoint=https://arctic.blob.core.windows.net;Container=acblob;SharedAccessSignature=sp=sig')
  4. 内存
    1
    ac_mem = adb.Arctic('mem://')

但是,根据ArcticDB的源码,也支持MongoDB。

ArcticDB/python/arcticdb/arctic.py,我们会看到如下内容,其中有MongoLibraryAdapter

ArcticDB的上一代是Arctic,Arctic是一款依赖于MongoDB的数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

【部分代码略】

class Arctic:
"""
Top-level library management class. Arctic instances can be configured against an S3 environment and enable the
creation, deletion and retrieval of Arctic libraries.
"""

_LIBRARY_ADAPTERS = [
S3LibraryAdapter,
LMDBLibraryAdapter,
AzureLibraryAdapter,
MongoLibraryAdapter,
InMemoryLibraryAdapter,
]

【部分代码略】

本地存储

示例代码:

1
2
3
4
5
6
import arcticdb as adb
# 这将使用本地文件系统设置存储
uri = "lmdb://demo"
ac = adb.Arctic(uri)

print(ac)

运行结果:

1
Arctic(config=LMDB(path=D:\ad\demo))

特别的,我们会看到新建了文件夹demo

D:\ad\demo

常用操作:

  • 创建库:create_library
  • 列出所有的库:list_libraries
  • 获取库:get_library
  • 删除库:delete_library
  • 是否有某个库:has_library

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

print("创建前:", ac.list_libraries())

cl = ac.create_library(name=library_name)
print("创建:", cl)

print("创建后:", ac.list_libraries())

print("获取库:", ac.get_library(name=library_name))

运行结果:

1
2
3
4
创建前: []
创建: Library(Arctic(config=LMDB(path=D:\ad\demo)), path=demo_db, storage=lmdb_storage)
创建后: ['demo_db']
获取库: Library(Arctic(config=LMDB(path=D:\ad\demo)), path=demo_db, storage=lmdb_storage)

读写

方法

方法:

  • write:读取数据。
  • read:写入数据。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import arcticdb as adb

import pandas as pd
import numpy as np
from datetime import datetime

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

cols = ['COL_%d' % i for i in range(50)]
df = pd.DataFrame(np.random.randint(0, 50, size=(25, 50)), columns=cols)
df.index = pd.date_range(datetime(2024, 1, 1, 5), periods=25, freq="h")

print(df.head(5))

write_info = library.write('test_frame', df)
print('write_info:', write_info)

read_info = library.read('test_frame')
print('read_info:', read_info)

read_data = read_info.data
print(read_data.head(5))

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36
2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28
2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26
2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33
2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26

[5 rows x 50 columns]
write_info: VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=0, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735295749390643600)
read_info: VersionedItem(symbol='test_frame', library='demo_db', data=<class 'pandas.core.frame.DataFrame'>, version=0, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735295749390643600)
COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49
2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36
2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28
2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26
2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33
2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26

[5 rows x 50 columns]

解释说明:

  • write方法和read方法的返回,都有一个字段version,版本号。
  • 获取读取到的数据,需要data属性。
    1
    library.read('test_frame').data

查看Symbol(表)

library.list_symbols(),列出当前库中所有的Symbol(表)。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

print(library.list_symbols())

运行结果:

1
['test_frame']

切片

date_range

date_rangeread方法的一个参数,用于指定时间范围。
其特点有:

  • 只对time-indexed的DataFrame生效
  • 左右两个参数都是闭区间。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from datetime import datetime

import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

# 将字符串转换为datetime对象
start_date = datetime.strptime('2024-01-01 05:00:00', '%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime('2024-01-01 09:00:00', '%Y-%m-%d %H:%M:%S')

df = library.read('test_frame', date_range=(start_date,end_date)).data

print(df)

运行结果:

1
2
3
4
5
6
7
8
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36
2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28
2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26
2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33
2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26

[5 rows x 50 columns]

row_range

row_rangeread方法的一个参数,用于指定行。
其特点有:

  • 左闭右开。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)


df = library.read('test_frame', row_range=(0,5)).data

print(df)

运行结果:

1
2
3
4
5
6
7
8
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-01 05:00:00 5 15 37 48 ... 33 40 13 36
2024-01-01 06:00:00 30 12 7 32 ... 33 18 5 28
2024-01-01 07:00:00 38 37 6 29 ... 41 29 30 26
2024-01-01 08:00:00 9 19 49 28 ... 14 25 2 33
2024-01-01 09:00:00 43 45 23 28 ... 23 0 20 26

[5 rows x 50 columns]

解释说明:读取第0行到第5行,不包括第5行;即左闭右开。

columns

columnsread方法的一个参数,用于指定列。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
import arcticdb as adb

uri = "lmdb://tmp/arcticdb_intro"
ac = adb.Arctic(uri)

library_name = "my_library_name"
library = ac.get_library(library_name)

df = library.read('test_frame', row_range=(0,5), columns=['COL_0', 'COL_1']).data

print(df)

运行结果:

1
2
3
4
5
6
                     COL_0  COL_1
2024-01-01 05:00:00 5 15
2024-01-01 06:00:00 30 12
2024-01-01 07:00:00 38 37
2024-01-01 08:00:00 9 19
2024-01-01 09:00:00 43 45

lazy

lazy参数,read方法的一个参数,用于控制查询的执行,默认为False

lazy=True时,查询不会立即执行,而是被推迟到调用LazyDataFrame对象上的collect方法时才执行。

基于此,我们可以对LazyDataFrame对象构建一系列转换和过滤操作,同时不必在每个步骤中都触发实际的数据处理,从而提高性能。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

lazy_df = library.read('test_frame', row_range=(0,5), columns=['COL_30', 'COL_31'], lazy=True)
lazy_df = lazy_df[(lazy_df["COL_30"] > 10) & (lazy_df["COL_31"] < 100)]

print(lazy_df)

df = lazy_df.collect().data

print(df)

tdf = library.read('test_frame', columns=['COL_30', 'COL_31']).data
tdf = tdf[(tdf["COL_30"] > 10) & (tdf["COL_31"] < 100)]
print(tdf.shape)

运行结果:

1
2
3
4
5
6
LazyDataFrame(ReadRequest(symbol=test_frame, row_range=(0, 5), columns=['COL_30', 'COL_31']) | WHERE ((Column["COL_30"] GT Num(10)) AND (Column["COL_31"] LT Num(100))))
COL_30 COL_31
2024-01-01 06:00:00 20 0
2024-01-01 07:00:00 20 24
2024-01-01 08:00:00 34 36
2024-01-01 09:00:00 38 12

解释说明:

  • lazy_df只是一个LazyDataFrame对象,只有执行collect(),才会真正去查询数据。
  • 特别的,我们查询了一个tdf,发现符合(COL_30 > 10) & (COL_31 < 100)的数据不止4条。
    也就是说,上文的row_range,并不是SQL中的limit的作用,几个条件之间有先后顺序的关系。

如果我们查询(COL_30 > 10) & (COL_31 < 100)的前5行,可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

lazy_df = library.read('test_frame', columns=['COL_30', 'COL_31'], lazy=True)
lazy_df = lazy_df[(lazy_df["COL_30"] > 10) & (lazy_df["COL_31"] < 100)]
lazy_df = lazy_df.head(5)

print(lazy_df)

df = lazy_df.collect().data

print(df)

tdf = library.read('test_frame', columns=['COL_30', 'COL_31']).data
tdf = tdf[(tdf["COL_30"] > 10) & (tdf["COL_31"] < 100)]
print(tdf.shape)
运行结果:
1
2
3
4
5
6
7
8
LazyDataFrame(ReadRequest(symbol=test_frame, columns=['COL_30', 'COL_31']) | WHERE ((Column["COL_30"] GT Num(10)) AND (Column["COL_31"] LT Num(100))) | ROWRANGE: HEAD, n=5)
COL_30 COL_31
2024-01-01 06:00:00 20 0
2024-01-01 07:00:00 20 24
2024-01-01 08:00:00 34 36
2024-01-01 09:00:00 38 12
2024-01-01 10:00:00 31 39
(18, 2)

修改、版本控制

append

appendLibrary对象的方法,对指定的Symbol(表),追加数据。
其特点有:

  • 追加操作始终沿索引进行。
  • 每次追加都会创建一个新版本来引用新追加的数据。
  • 追加仅接受其第一行索引等于或大于现有数据最后一行索引的数据。

在下文的例子中:最后一行的索引是2024-01-02 05:00:00,我们添加索引大于2024-01-02 05:00:00的数据会成功。如果我们添加索引不大于2024-01-02 05:00:00会失败,具体可以自己试一试。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from datetime import datetime

import arcticdb as adb
import numpy as np
import pandas as pd

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

print(library.tail('test_frame', 5).data)

random_data = np.random.randint(0, 50, size=(3, 50))
df_append = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50)])
df_append.index = pd.date_range(datetime(2024, 12, 1, 7), periods=3, freq="h")

append_info = library.append('test_frame', df_append)

print(append_info)

print(library.tail('test_frame', 5).data)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-02 01:00:00 39 8 7 24 ... 9 2 26 20
2024-01-02 02:00:00 48 40 35 40 ... 14 45 27 38
2024-01-02 03:00:00 11 22 32 23 ... 41 14 22 24
2024-01-02 04:00:00 36 49 5 14 ... 19 4 7 46
2024-01-02 05:00:00 8 35 31 2 ... 33 29 31 12

[5 rows x 50 columns]
VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=1, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735297630804373900)
COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49
2024-01-02 04:00:00 36 49 5 14 ... 19 4 7 46
2024-01-02 05:00:00 8 35 31 2 ... 33 29 31 12
2024-12-01 07:00:00 1 45 37 42 ... 1 33 35 16
2024-12-01 08:00:00 49 47 25 1 ... 38 9 35 21
2024-12-01 09:00:00 18 38 45 41 ... 1 47 25 46

[5 rows x 50 columns]

update

updateLibrary对象的方法,更新指定的Symbol(表)的数据。
其特点有:

  • 从第一个索引条目到最后一个索引条目的整个范围将被入参的内容完全替换。
    注意是整个范围,所以可能会删除某个索引条目,也可能会添加某个的索引条目。
  • 必须是time-indexed的DataFrame。
  • 如果data没有row数据,则不会执行任何操作,也不会创建新版本。
    所以,不能使用updatedate_range仅删除数据的一部分。(可以使用delete_data_in_range方法)。

在下文的代码中,我们用一个索引分别是2024-01-01 05:00:002024-01-01 07:00:002024-01-01 09:00:00的DataFrame去更新原来的DataFrame,发现,从第一个索引2024-01-01 05:00:00到最后一个2024-01-01 09:00:00的数据都被更新了,并且原来的2024-01-01 06:00:002024-01-01 08:00:00被删除了。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from datetime import datetime

import arcticdb as adb
import numpy as np
import pandas as pd

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

print(library.head('test_frame', 5).data)

random_data = np.random.randint(0, 50, size=(3, 50))
df_update = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50)])
df_update.index = pd.date_range(datetime(2024, 1, 1, 5), periods=3, freq="2h")

print(df_update)

update_info = library.update('test_frame', df_update)

print(update_info)

print(library.head('test_frame', 5).data)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-01 05:00:00 7 46 17 9 ... 48 7 0 4
2024-01-01 06:00:00 17 30 21 44 ... 28 30 6 12
2024-01-01 07:00:00 27 48 34 29 ... 45 30 39 36
2024-01-01 08:00:00 1 46 31 49 ... 7 14 27 36
2024-01-01 09:00:00 34 48 23 8 ... 15 32 33 5

[5 rows x 50 columns]
COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49
2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10
2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8
2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23

[3 rows x 50 columns]
VersionedItem(symbol='test_frame', library='demo_db', data=n/a, version=1, metadata=None, host='LMDB(path=D:\\ad\\demo)', timestamp=1735298719295936200)
COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49
2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10
2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8
2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23
2024-01-01 10:00:00 10 37 1 40 ... 47 6 11 2
2024-01-01 11:00:00 15 0 19 35 ... 12 43 27 19

[5 rows x 50 columns]

版本控制

版本控制,也有资料称之为时间旅行。

在上文,我们已经发现了,VersionedItem其中一个属性是version,版本号。

我们可以基于as_of参数,指定版本号。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "demo_db"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

print(library.head('test_frame', 5, as_of=0).data)
print(library.head('test_frame', 5, as_of=1).data)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                     COL_0  COL_1  COL_2  COL_3  ...  COL_46  COL_47  COL_48  COL_49
2024-01-01 05:00:00 7 46 17 9 ... 48 7 0 4
2024-01-01 06:00:00 17 30 21 44 ... 28 30 6 12
2024-01-01 07:00:00 27 48 34 29 ... 45 30 39 36
2024-01-01 08:00:00 1 46 31 49 ... 7 14 27 36
2024-01-01 09:00:00 34 48 23 8 ... 15 32 33 5

[5 rows x 50 columns]
COL_0 COL_1 COL_2 COL_3 ... COL_46 COL_47 COL_48 COL_49
2024-01-01 05:00:00 14 44 17 36 ... 44 21 40 10
2024-01-01 07:00:00 13 13 29 43 ... 45 1 16 8
2024-01-01 09:00:00 20 0 36 21 ... 32 36 32 23
2024-01-01 10:00:00 10 37 1 40 ... 47 6 11 2
2024-01-01 11:00:00 15 0 19 35 ... 12 43 27 19

[5 rows x 50 columns]

特别的,默认情况下,写入、追加和更新操作不会移除之前的版本,所以这将会占用更多的空间。
我们可以在写入、追加或者更新的时候,通过prune_previous_versions参数定义是否移除之前的版本。

索引

分类

四种索引

ArcticDB支持以下索引类型:

  • Int64Index或UInt64Index
  • RangeIndex
  • DatetimeIndex
  • 由上述支持类型的组成的MultiIndex

Int64Index

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
import pandas as pd

# 创建一个包含整数的列表作为索引
index = pd.Index([10, 20, 30], name='example_index')

# 使用这个索引创建一个 DataFrame
df_int64 = pd.DataFrame({
'A': ['foo', 'bar', 'baz'],
'B': [1, 2, 3]
}, index=index)

print(df_int64)

运行结果:

1
2
3
4
5
                 A  B
example_index
10 foo 1
20 bar 2
30 baz 3

DatetimeIndex

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pandas as pd

# 创建一个包含日期时间戳的索引
dates = pd.date_range(start="2023-01-01", periods=3, freq='D')
index = pd.DatetimeIndex(dates, name='date')

# 使用这个索引创建一个 DataFrame
df_datetime = pd.DataFrame({
'temperature': [22.5, 24.0, 23.0],
'humidity': [55, 60, 65]
}, index=index)

print("\nDatetimeIndex DataFrame:")
print(df_datetime)

运行结果:

1
2
3
4
5
6
DatetimeIndex DataFrame:
temperature humidity
date
2023-01-01 22.5 55
2023-01-02 24.0 60
2023-01-03 23.0 65

RangeIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pandas as pd

# 默认情况下,如果你不指定索引,Pandas会自动创建一个 RangeIndex
df_range = pd.DataFrame({
'values': [100, 200, 300]
})

print("RangeIndex DataFrame (default):")
print(df_range)

# 显式创建一个 RangeIndex
range_index = pd.RangeIndex(start=10, stop=13, step=1, name='sequence')
df_explicit_range = pd.DataFrame({
'letters': ['x', 'y', 'z']
}, index=range_index)

print("Explicit RangeIndex DataFrame:")
print(df_explicit_range)

输出将是:

1
2
3
4
5
6
7
8
9
10
11
RangeIndex DataFrame (default):
values
0 100
1 200
2 300
Explicit RangeIndex DataFrame:
letters
sequence
10 x
11 y
12 z

MultiIndex

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pandas as pd

# 创建一个多层索引(MultiIndex),其中一层是 Int64Index,另一层是 DatetimeIndex
arrays = [
[1, 1, 2, 2], # 第一层索引:Int64Index
pd.date_range('2023-01-01', periods=4) # 第二层索引:DatetimeIndex
]

multi_index = pd.MultiIndex.from_arrays(arrays, names=('group', 'date'))

# 使用这个 MultiIndex 创建一个 DataFrame
df_multi = pd.DataFrame({
'sales': [1000, 1500, 2000, 2500]
}, index=multi_index)

print("MultiIndex DataFrame:")
print(df_multi)

运行结果:

1
2
3
4
5
6
7
8
9
MultiIndex DataFrame:
sales
group date
1 2023-01-01 1000
2023-01-02 1500
2 2023-01-03 2000
2023-01-04 2500

Process finished with exit code 0

自动创建

在ArcticDB中创建索引的方式与传统关系型数据库不同,不需要我们手动创建索引,也没有所谓的创建索引的语句或方法,而是根据我们传入的DataFrame,自动创建。

并行写入

并行写入,也被称为分阶段写入,是指我们并发的往一个Symbol(表)中写入数据。
其特点有:

  • 写入的时候调用write方法,传入参数staged,值为True
  • 写入完成后,必须调用library.finalize_staged_data()

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import arcticdb as adb
import pandas as pd
import numpy as np
from datetime import datetime
from joblib import Parallel, delayed

# 设置连接和库名
uri = "lmdb://demo"
ac = adb.Arctic(uri)
library_name = "demo_db_parallel"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

# 创建示例DataFrame
cols = ['COL_%d' % i for i in range(50)]
df = pd.DataFrame(np.random.randint(0, 50, size=(25, 50)), columns=cols)
df.index = pd.date_range(datetime(2024, 1, 1, 5), periods=25, freq="h")

# 分块函数
def write_chunk(chunk_df, chunk_id):
print(chunk_id)
library.write('test_df', chunk_df, staged=True)

# 并行写入函数
def parallel_write(df_val, n_jobs=-1):
# 将DataFrame分块
# 每个chunk包含5行
chunks = [df_val[i:i+5] for i in range(0, len(df_val), 5)]
# 使用joblib进行并行写入
results = Parallel(n_jobs=n_jobs)(
delayed(write_chunk)(chunk, idx) for idx, chunk in enumerate(chunks)
)
return results

# 执行并行写入
parallel_write(df)

# print(library.read('test_df'))
# print(library.read('test_df').data)


# 所有分阶段写入完成后,进行最终化使数据可用
library.finalize_staged_data('test_df')

print(library.read('test_df'))
print(library.read('test_df').data)

快照

什么是快照

快照,Snapshot,保存数据的一个固定版本,将来可以恢复到这个版本。

其特点有:

  • 创建快照是一个轻量级的操作,不会影响正在进行的读取或写入操作。
  • 当有多个快照时候,第二个快照只存储与上一个快照相比发生变化的数据,相对节省存储空间。

操作

创建快照

snapshotLibrary对象的方法,创建快照。
有四个参数:

  • snapshot_name
  • metadata 元数据
  • skip_symbolsList[str],跳过的Symbol(表)
  • versionsDict[str, int],例如versions={"a": 2, "b": 3},快照2版本的a、3版本的b

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import arcticdb as adb

import pandas as pd
import numpy as np
from datetime import datetime

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "sdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

cols = ['COL_%d' % i for i in range(10)]
df = pd.DataFrame(np.random.randint(0, 50, size=(10, 10)), columns=cols)
df.index = pd.date_range(datetime(2024, 1, 1, 5), periods=10, freq="h")
library.write('test_frame', df)

# 为整个库创建快照
library.snapshot("snapshot_name")

列出所有快照

list_snapshotsLibrary对象的方法,其中一个参数是load_metadata,决定了是否加载每个快照的元数据(metadata)。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "sdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

snapshot_list = library.list_snapshots()

print(snapshot_list)

运行结果:

1
{'snapshot_name': None}

解释说明:本文我们没有定义快照的元数据。

读取快照

基于readas_of参数指定快照。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "sdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

library.delete('test_frame')

try:
print(library.read(symbol='test_frame').data)
except Exception:
print(Exception)

print(library.read(symbol='test_frame',as_of='snapshot_name').data)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
20241227 22:42:46.889488 19816 W arcticdb | Nothing to delete for symbol 'test_frame'
<class 'Exception'>
COL_0 COL_1 COL_2 COL_3 ... COL_6 COL_7 COL_8 COL_9
2024-01-01 05:00:00 16 44 26 16 ... 29 40 34 45
2024-01-01 06:00:00 6 43 25 14 ... 16 24 33 38
2024-01-01 07:00:00 35 17 22 49 ... 5 22 33 28
2024-01-01 08:00:00 20 24 29 13 ... 45 10 45 14
2024-01-01 09:00:00 21 41 35 3 ... 32 7 4 41
2024-01-01 10:00:00 29 42 45 0 ... 6 14 15 38
2024-01-01 11:00:00 27 42 38 15 ... 38 4 9 42
2024-01-01 12:00:00 23 3 5 8 ... 47 6 22 2
2024-01-01 13:00:00 20 35 4 11 ... 7 6 22 45
2024-01-01 14:00:00 18 11 16 13 ... 47 23 41 5

[10 rows x 10 columns]

删除快照

delete_snapshotLibrary对象的方法,删除快照。

元数据

什么是元数据

元数据,Metadata,辅助信息,元数据可以用来描述数据的结构、内容、质量以及其他属性,帮助用户更好地理解和处理这些数据。

操作

添加元数据

两种方法:

  • write写入数据时,可以通过metadata参数传递一个字典形式的元数据。
  • write_metadata

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from datetime import datetime

import arcticdb as adb
import numpy as np
import pandas as pd

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "mdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)


cols = ['COL_%d' % i for i in range(10)]
df = pd.DataFrame(np.random.randint(0, 50, size=(10, 10)), columns=cols)
df.index = pd.date_range(datetime(2024, 1, 1, 5), periods=10, freq="h")

# 写入数据并附带元数据
metadata = {
'created_by': 'data_engineer',
'description': 'Test dataset for demonstration purposes.',
'version': '1.0'
}
library.write('test_symbol', df, metadata=metadata)

获取元数据

两种方法:

  1. library.read('test_symbol').metadata
  2. library.read_metadata('test_symbol')

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "mdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)

print(library.read('test_symbol').metadata)

print(library.read_metadata('test_symbol'))

更新元数据

再写一遍,即可更新。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import arcticdb as adb

uri = "lmdb://demo"
ac = adb.Arctic(uri)

library_name = "mdb"

if ac.has_library(library_name):
library = ac.get_library(name=library_name)
else:
library = ac.create_library(name=library_name)


new_metadata = {
'created_by': 'data_engineer',
'description': 'Updated test dataset with more information.',
'version': '2.0'
}
library.write_metadata(symbol='test_symbol',metadata=new_metadata)

print(library.read_metadata(symbol='test_symbol'))
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/29902
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板