Greenplum 资源组可用于分配给一个或多个角色(用户)。使用 CREATE ROLE 或 ALTER ROLE 命令的 RESOURCE GROUP 子句将资源组分配给数据库角色。如果未为角色指定资源组,则会为角色分配角色功能的默认组。 SUPERUSER 角色分配了 admin_group
,非管理员角色分配了名为 default_group 的组。
staticResGroupSlotData*groupGetSlot(ResGroupData*group){ResGroupSlotData*slot;ResGroupCaps*caps;int32slotMemQuota;Assert(LWLockHeldExclusiveByMe(ResGroupLock));Assert(Gp_role==GP_ROLE_DISPATCH);Assert(groupIsNotDropped(group));caps=&group->caps;/* First check if the concurrency limit is reached */if(group->nRunning>=caps->concurrency)returnNULL;slotMemQuota=groupReserveMemQuota(group);if(slotMemQuota<0)returnNULL;/* Now actually get a free slot */slot=slotpoolAllocSlot();Assert(!slotIsInUse(slot));initSlot(slot,group,slotMemQuota);group->nRunning++;returnslot;}
Resource Group 通过 cgroup 实现对 cpu 的限制。在 resource group 创建或者修改时(比如 initCpu ),数据库会在操作系统 cgroup 路径下,创建与 resource group oid 同名的 cgroup 路径,即做对应挂载。并根据设置的 CPU 配置,更新对应
cgroup 子路径下的 cpu.cfs_period_us 、cpu.cfs_quota_us 、cpu.shares 文件。
voidResGroupOps_AssignGroup(Oidgroup,ResGroupCaps*caps,intpid){boololdViaCpuset=oldCaps.cpuRateLimit==CPU_RATE_LIMIT_DISABLED;boolcurViaCpuset=caps?caps->cpuRateLimit==CPU_RATE_LIMIT_DISABLED:false;/* needn't write to file if the pid has already been written in.
* Unless it has not been writtien or the group has changed or
* cpu control mechanism has changed */if(IsUnderPostmaster&&group==currentGroupIdInCGroup&&caps!=NULL&&oldViaCpuset==curViaCpuset)return;writeInt64(group,BASETYPE_GPDB,RESGROUP_COMP_TYPE_CPU,"cgroup.procs",pid);writeInt64(group,BASETYPE_GPDB,RESGROUP_COMP_TYPE_CPUACCT,"cgroup.procs",pid);if(gp_resource_group_enable_cgroup_cpuset){if(caps==NULL||!curViaCpuset){/* add pid to default group */writeInt64(DEFAULT_CPUSET_GROUP_ID,BASETYPE_GPDB,RESGROUP_COMP_TYPE_CPUSET,"cgroup.procs",pid);}else{writeInt64(group,BASETYPE_GPDB,RESGROUP_COMP_TYPE_CPUSET,"cgroup.procs",pid);}}/*
* Do not assign the process to cgroup/memory for now.
*/currentGroupIdInCGroup=group;if(caps!=NULL){oldCaps.cpuRateLimit=caps->cpuRateLimit;StrNCpy(oldCaps.cpuset,caps->cpuset,sizeof(oldCaps.cpuset));}}
数据库会将对应进程的 pid 写进子路径的 cgroup.procs 文件里,从而利用操作系统的 cgroup 能力对进程的 cpu 使用进行限制。
staticint32groupIncMemUsage(ResGroupData*group,ResGroupSlotData*slot,int32chunks){int32slotMemUsage;/* the memory current slot has been used */int32sharedMemUsage;/* the total shared memory usage,
sum of group share and global share */int32globalOveruse=0;/* the total over used chunks of global share*//* Add the chunks to memUsage in slot */slotMemUsage=pg_atomic_add_fetch_u32((pg_atomic_uint32*)&slot->memUsage,chunks);/* Check whether shared memory should be added */sharedMemUsage=slotMemUsage-slot->memQuota;if(sharedMemUsage>0){/* Decide how many chunks should be counted as shared memory */int32deltaSharedMemUsage=Min(sharedMemUsage,chunks);/* Add these chunks to memSharedUsage in group,
* and record the old value*/int32oldSharedUsage=pg_atomic_fetch_add_u32((pg_atomic_uint32*)&group->memSharedUsage,deltaSharedMemUsage);/* the free space of group share */int32oldSharedFree=Max(0,group->memSharedGranted-oldSharedUsage);/* Calculate the global over used chunks */int32deltaGlobalSharedMemUsage=Max(0,deltaSharedMemUsage-oldSharedFree);/* freeChunks -= deltaGlobalSharedMemUsage and get the new value */int32newFreeChunks=pg_atomic_sub_fetch_u32((pg_atomic_uint32*)&pResGroupControl->freeChunks,deltaGlobalSharedMemUsage);/* calculate the total over used chunks of global share */globalOveruse=Max(0,0-newFreeChunks);}/* Add the chunks to memUsage in group */pg_atomic_add_fetch_u32((pg_atomic_uint32*)&group->memUsage,chunks);returnglobalOveruse;}
在进行内存限制时,对于某个 slot 的内存请求,首先会通过原子相加的方式从资源组的固定份额部分获取内存;而如果所需要的内存超过固定份额的内存量,会尝试从资源组内的共享内存部分获取;如果依然无法获取到内存,则会尝试从全局的共享内存获取。如果从全局共享内存依然无法获取到内存,则会返回 Out of Memory 错误。
staticvoidgetResUsage(ResGroupStatCtx*ctx,OidinGroupId){int64*usages;TimestampTz*timestamps;inti,j;usages=palloc(sizeof(*usages)*ctx->nGroups);timestamps=palloc(sizeof(*timestamps)*ctx->nGroups);for(j=0;j<ctx->nGroups;j++){ResGroupStat*row=&ctx->groups[j];OidgroupId=DatumGetObjectId(row->groupId);usages[j]=ResGroupOps_GetCpuUsage(groupId);timestamps[j]=GetCurrentTimestamp();}if(Gp_role==GP_ROLE_DISPATCH){CdbPgResultscdb_pgresults={NULL,0};StringInfoDatabuffer;initStringInfo(&buffer);appendStringInfo(&buffer,"SELECT groupid, cpu_usage, memory_usage ""FROM pg_resgroup_get_status(%d)",inGroupId);CdbDispatchCommand(buffer.data,DF_WITH_SNAPSHOT,&cdb_pgresults);if(cdb_pgresults.numResults==0)elog(ERROR,"pg_resgroup_get_status() didn't get back any resource statistic from the segDBs");for(i=0;i<cdb_pgresults.numResults;i++){structpg_result*pg_result=cdb_pgresults.pg_results[i];/*
* Any error here should have propagated into errbuf, so we shouldn't
* ever see anything other that tuples_ok here. But, check to be
* sure.
*/if(PQresultStatus(pg_result)!=PGRES_TUPLES_OK){cdbdisp_clearCdbPgResults(&cdb_pgresults);elog(ERROR,"pg_resgroup_get_status(): resultStatus not tuples_Ok");}Assert(PQntuples(pg_result)==ctx->nGroups);for(j=0;j<ctx->nGroups;j++){constchar*result;ResGroupStat*row=&ctx->groups[j];OidgroupId=pg_atoi(PQgetvalue(pg_result,j,0),sizeof(Oid),0);Assert(groupId==row->groupId);if(row->memUsage->len==0){Datumd=ResGroupGetStat(groupId,RES_GROUP_STAT_MEM_USAGE);row->groupId=groupId;appendStringInfo(row->memUsage,"{\"%d\":%s",GpIdentity.segindex,DatumGetCString(d));appendStringInfo(row->cpuUsage,"{");calcCpuUsage(row->cpuUsage,usages[j],timestamps[j],ResGroupOps_GetCpuUsage(groupId),GetCurrentTimestamp());}result=PQgetvalue(pg_result,j,1);appendStringInfo(row->cpuUsage,", %s",result);result=PQgetvalue(pg_result,j,2);appendStringInfo(row->memUsage,", %s",result);if(i==cdb_pgresults.numResults-1){appendStringInfoChar(row->cpuUsage,'}');appendStringInfoChar(row->memUsage,'}');}}}cdbdisp_clearCdbPgResults(&cdb_pgresults);}else{pg_usleep(300000);for(j=0;j<ctx->nGroups;j++){ResGroupStat*row=&ctx->groups[j];OidgroupId=DatumGetObjectId(row->groupId);Datumd=ResGroupGetStat(groupId,RES_GROUP_STAT_MEM_USAGE);appendStringInfo(row->memUsage,"\"%d\":%s",GpIdentity.segindex,DatumGetCString(d));calcCpuUsage(row->cpuUsage,usages[j],timestamps[j],ResGroupOps_GetCpuUsage(groupId),GetCurrentTimestamp());}}}
而对于 cpu 的计算,会在一开始先调用 ResGroupOps_GetCpuUsage 计算一次 cpu 使用量,通过读取磁盘上 cgroup 对应节点的
cpu 统计结果。然后 sleep 300000 us ,重新调用 ResGroupOps_GetCpuUsage 再计算一次 cpu 使用量,通过两次结果的差值返回 cpu 的统计结果。