1.CEPH通信连接模型:
首先通信双方建立socket连接,然后server端会向client发送banner和地址信息
Banner的定义如下:
"ceph %x %xn",protocol_features_suppored,protocol_features_required
在系统中声明为常量,用于交换协议信息。地址信息通过linux的msghdr结构体传递。
Client接收到banner和地址信息验证通过后,也会向server端发送自己的banner和地址信息。
然后client向server端发送connect信息,server端收到后会验证,成功则回复reply connect信息。涉及到的两个结构体如下:
struct ceph_msg_connect {
__le64 features; /* supported feature bits */
__le32 host_type; /* CEPH_ENTITY_TYPE_* */
__le32 global_seq; /* count connections initiated by this host */
__le32 connect_seq; /* count connections initiated in this session */
__le32 protocol_version;
__le32 authorizer_protocol;
__le32 authorizer_len;
__u8 flags; /* CEPH_MSG_CONNECT_* */
} __attribute__ ((packed));
struct ceph_msg_connect_reply {
__u8 tag;
__le64 features; /* feature bits for this session */
__le32 global_seq;
__le32 connect_seq;
__le32 protocol_version;
__le32 authorizer_len;
__u8 flags;
} __attribute__ ((packed));
对于新连接而言,global_seq一般为0。
然后通信双方建立pipe,会话就建立完成了。
2.消息发送接收流程
发送流程如下:
接收流程如下:
消息结构介绍:
Message 是所有消息的基类,任何要发送的消息,都要继承该类。对于消息,其发送格式如下:
Header | User data | Footer |
---|
User data包括:
Payload | Middle | Data |
---|
payload, 一般保存ceph操作的元数据;middle 预留,目前没有使用到;data 一般为读写的数据。
整个Message的结构如下:
class Message{
ceph_msg_header header; *// 消息头*
ceph_msg_footer footer; *// 消息尾*
bufferlist payload; *// "front" unaligned blob*
bufferlist middle; *// "middle" unaligned blob*
bufferlist data;
utime_t recv_stamp; *//开始接收数据的时间戳*
utime_t dispatch_stamp; *// dispatch 的时间戳*
utime_t throttle_stamp; */\* time at which message was fully read \*/*
utime_t recv_complete_stamp; *//接收完成的时间戳*
ConnectionRef connection; *//链接*
uint32_t magic; bi::list_member_hook<> dispatch_q; *//boost::intrusive list 的 member* }
ceph_msg_header 主要是封装数据相关信息:
struct ceph_msg_header {
__le64 seq; /* message seq*# for this session ## 当前session内 消息的唯一 序号\*/* __le64 tid; /* transaction id *## 消息的全局唯一的 id\*/*
__le16 type; /* message type *## 消息类型 \*/*
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload *## payload 的长度\*/*
__le32 middle_len; /* bytes in middle payload *## middle 的长度\*/*
__le32 data_len; /* bytes of data payload *## data 的 长度 \*/*
... ... ... } __attribute__ ((packed));
ceph_msg_footer 主要是封装结束标记和CRC校验码:
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;*//三个部分的 crc 效验码*
__le64 sig; *// 消息的64位 signature*
__u8 flags; *//结束标志* } __attribute__ ((packed));
3. 消息类型:
3.1 Monitor自身
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
CEPH_MSG_PING | MPing | 定期Ping Monitor确认Monitor的存在 | handle_ping |
CEPH_MSG_MON_GET_MAP | MMonGetMap | 认证前获取MonMap | handle_mon_get_map |
CEPH_MSG_MON_METADATA | MMonMetadata | 处理保存某个Monitor的系统信息(cpu,内存等) | handle_mon_metadata |
MSG_MON_COMMAND | MMonCommand | 传递命令行消息给Monitor,Monitor再分发给相应的XXXMonitor进行处理 | handle_command |
CEPH_MSG_MON_GET_VERSION | MMonGetVersion | 获取cluster map的版本信息 | handle_get_version |
CEPH_MSG_MON_SUBSCRIBE | MMonSubscribe | Cluster map订阅更新 | handle_subscribe |
MSG_ROUTE | MRoute | 路由请求转发(待确认) | handle_route |
MSG_MON_PROBE | MMonProbe | 启动加入时需要向其他Monitor发送Probe请求 | handle_probe |
MSG_MON_SYNC | MMonSync | 同步Paxos状态数据 | handle_sync |
MSG_MON_SCRUB | MMonScrub | MonitorDBStore数据一致性检测 | handle_scrub |
MSG_MON_JOIN | MMonJoin | 如果不在MonMap中申请加入到MonMap | MonmapMonitor::prepare_join |
MSG_MON_PAXOS | MMonPaxos | 选举完成后,leader会触发Paxos::leader_init,状态置为STATE_RECOVERING,并发起该消息的OP_COLLECT流程 | Paxos::dispatch |
MSG_MON_ELECTION | MMonElection | 发起选举流程 | Elector::dispatch |
MSG_FORWARD | MForward | 将请求转发到leader | handle_forward |
MSG_TIMECHECK | MTimeCheck | Monitor每隔mon_timecheck_interval检测所有Monitor的系统时间来检测节点之间的时间差 | handle_timecheck |
MSG_MON_HEALTH | MMonHealth | 每隔mon_health_data_update_interval检测存放Monitor上面使用的leveldb数据的状态 | HealthMonitor::service_dispatch |
3.2 AuthMonitor
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
MSG_MON_COMMAND | MMonCommand | 处理ceph auth xxx命令行相关处理 | preprocess_command处理ceph auth get/export/list等 prepare_command处理ceph auth import/add/get-or-create/caps等 |
CEPH_MSG_AUTH | MAuth | 实现认证和授权消息处理 | prep_auth |
3.3 OSDMonitor
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
CEPH_MSG_MON_GET_OSDMAP | MMonGetOSDMap | 获取OSDMap | preprocess_get_osdmap |
MSG_OSD_MARK_ME_DOWN | MOSDMarkMeDown | OSD shutdown之前通知Monitor发送该消息 | preprocess_mark_me_down prepare_mark_me_down |
MSG_OSD_FAILURE | MOSDFailure | 1. OSD每隔OSD_TICK_INTERVAL检测心跳无响应的OSD,并将失败的OSD report给Monitor 2. Monitor判断上报次数>=mon_osd_min_down_reports,那么就将target_osd标识为down | preprocess_failure |
MSG_OSD_BOOT | MOSDBoot | 新OSD加入时发送请求到Monitor,参考新OSD的加入流程 | preprocess_bootprepare_boot |
MSG_OSD_ALIVE | MOSDAlive | OSD判断up_thru_wanted决定是否发送请求给Monitor,Monitor发送Incremental OSDMap返回给OSD | preprocess_alive prepare_alive |
MSG_OSD_PGTEMP | MOSDPGTemp | Primary OSD处于backfilling状态无法提供读取服务时,会发送该消息到Monitor,将PG临时映射到其他的OSD上提供去服务 | preprocess_pgtemp prepare_pgtemp |
MSG_REMOVE_SNAPS | MRemoveSnaps | 删除快照信息 | prepare_remove_snaps |
CEPH_MSG_POOLOP | MPoolOp | 删除/创建Pool,创建/删除pool快照等 | prepare_pool_op |
3.4 PGMonitor
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
CEPH_MSG_STATFS | MStatfs | 返回文件系统osd占用的kb容量 | handle_statfs |
MSG_PGSTATS | MPGStats | 查询或者更新pg状态 | preprocess_pg_stats prepare_pg_stats |
MSG_GETPOOLSTATS | MGetPoolStats | 获取pool汇总状态信息 | preprocess_getpoolstats |
MSG_MON_COMMAND | MMonCommand | 处理ceph pg xxx相关命令行 | preprocess_command |
3.5 MonMapMonitor
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
MSG_MON_JOIN | MMonJoin | 更新MonMap | preprocess_join prepare_join |
MSG_MON_COMMAND | MMonCommand | 处理ceph mon xxx相关命令行 | preprocess_command prepare_command |
3.6 MDSMonitor
消息类型 | 消息结构体 | 消息作用 | 处理接口 |
---|---|---|---|
MSG_MDS_BEACON | |||
MSG_MDS_OFFLOAD_TARGETS |
附:
Msgr.h文件:
1.Msgr.h文件:定义消息传输层的数据类型,以供ceph使用
(1)默认的监控端口:
#define CEPH_MON_PORT 6789
(2)客户端处理端口范围定义:
#define CEPH_PORT_FIRST 6789//监控
#define CEPH_PORT_START 6800 //开始
#define CEPH_PORT_LAST 6900//结束
(3)tcp协议标识和版本信息:
#define CEPH_BANNER "ceph v027"
#define CEPH_BANNER_MAX_LEN 30//最大长度
(4)ceph中的实体名称:在网络传输中使用,例如mds0表示元数据服务器0
struct ceph_entity_name {
_u8 type; /* CEPH_ENTITY_TYPE* /*
__le64 num;
} attribute ((packed));//按照紧凑模式分配内存,而不是内存对齐
#define CEPH_ENTITY_TYPE_MON 0x01//监控服务器实体
#define CEPH_ENTITY_TYPE_MDS 0x02//元数据服务器实体
#define CEPH_ENTITY_TYPE_OSD 0x04//对象存储设备实体
#define CEPH_ENTITY_TYPE_CLIENT 0x08//客户端实体
#define CEPH_ENTITY_TYPE_AUTH 0x20//权限实体
#define CEPH_ENTITY_TYPE_ANY 0xFF//任何
同时提供一个函数用于将类型转换为名字字符串的函数如下:
const char *ceph_entity_type_name(int type)
{
switch (type) {
case CEPH_ENTITY_TYPE_MDS: return "mds";
case CEPH_ENTITY_TYPE_OSD: return "osd";
case CEPH_ENTITY_TYPE_MON: return "mon";
case CEPH_ENTITY_TYPE_CLIENT: return "client";
case CEPH_ENTITY_TYPE_AUTH: return "auth";
default: return "unknown";
}
}
(5)实体网络地址以及与名字实体的对应关系结构体
struct ceph_entity_addr {
__le32 type;
__le32 nonce; /* unique id for process (e.g. pid) */
struct sockaddr_storage in_addr;
} attribute ((packed));
struct ceph_entity_inst {
struct ceph_entity_name name;
struct ceph_entity_addr addr;
} attribute ((packed));
(6)消息交换协议定义:
#define CEPH_MSGR_TAG_READY 1 /* server->client: ready for messages */
#define CEPH_MSGR_TAG_RESETSESSION 2 /* server->client: reset, try again */
#define CEPH_MSGR_TAG_WAIT 3 /* server->client: wait for racing
incoming connection */
#define CEPH_MSGR_TAG_RETRY_SESSION 4 /* server->client + cseq: try again
with higher cseq */
#define CEPH_MSGR_TAG_RETRY_GLOBAL 5 /* server->client + gseq: try again
with higher gseq */
#define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */
#define CEPH_MSGR_TAG_MSG 7 /* message */
#define CEPH_MSGR_TAG_ACK 8 /* message ack */
#define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */
#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
#define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */
#define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */
(7)连接协商
struct ceph_msg_connect {//连接消息结构体
__le64 features; /* supported feature bits */支持的特征位
_le32 host_type; /* CEPH_ENTITY_TYPE* /*上面提到的实体类型
__le32 global_seq; /* count connections initiated by this host */主机初始化的连接数量
__le32 connect_seq; /* count connections initiated in this session */在这个对话中的连接数量
__le32 protocol_version;//协议版本
__le32 authorizer_protocol;//权限协议
__le32 authorizer_len;//权限长度
_u8 flags; /* CEPH_MSG_CONNECT* /*
} attribute ((packed));
struct ceph_msg_connect_reply {//连接回复消息结构体
__u8 tag;//上面定义传递消息的类型
__le64 features; /* feature bits for this session */这个对话的特征位
__le32 global_seq;
__le32 connect_seq;
__le32 protocol_version;
__le32 authorizer_len;
__u8 flags;
} attribute ((packed));
#define CEPH_MSG_CONNECT_LOSSY 1 /* messages i send may be safely dropped */
(8)消息头部结构:有老的
struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
* oldest code we think can decode this. unknown if zero. */
__le16 compat_version;
__le16 reserved;
__le32 crc; /* header crc32c */
} attribute ((packed));
#define CEPH_MSG_PRIO_LOW 64
#define CEPH_MSG_PRIO_DEFAULT 127
#define CEPH_MSG_PRIO_HIGH 196
#define CEPH_MSG_PRIO_HIGHEST 255
(9)数据有效载荷结构
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
__u8 flags;
} attribute ((packed));
#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */
#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */
MSGR2协议:
定义:
client(C):发起(TCP)连接的一方
server(S):接受(TCP)连接的一方
connection:两个进程之间的(TCP)连接的实例。
entity:ceph实体实例,例如“ osd.0”。每个实体凭借“ nonce”字段(通常是pid或随机值)具有一个或多个唯一的entity_addr_t。
session:两个实体之间的有状态会话,其中消息交换是有序的,并且是无损的。如果存在中断(TCP连接断开连接),则会话可能跨越多个连接。
frame:在对等体之间发送的离散消息。每个帧都包含一个标签(类型代码),有效负载以及(如果启用了签名或加密)其他一些字段。有关结构,请参见下文。
tag:与框架关联的类型代码。标签确定有效载荷的结构。
阶段:
连接具有四个不同的阶段:
1.Banner
2.认证帧交换
3.消息流握手帧交换
4.消息帧交换
BANNER:
客户端和服务器在连接后均会发送
"ceph %x %xn",protocol_features_suppored, protocol_features_required
初始化为“ceph 0 0n”.
帧格式:
发送或接收的所有其他数据都包含在一个帧中。每个框架具有以下形式:
frame_len (le32)
tag (TAG_* le32)
frame_header_checksum (le32)
payload
[payload padding -- only present after stream auth phase]
[signature -- only present after stream auth phase]
frame_header_checksum仅在frame_len和tag(8个字节)上。
frame_len包括frame_len le32之后到帧末尾的所有内容(所有 payloads, signatures,和填充(padding))。
payload 的格式和长度由tag确定。
仅当身份验证阶段已完成(TAG_AUTH_DONE已发送)并且启用了签名时,signatures 部分才存在。