Skip to content

Commit

Permalink
optimize the Group operator. (#20818)
Browse files Browse the repository at this point in the history
## What type of PR is this?

- [ ] API-change
- [ ] BUG
- [x] Improvement
- [ ] Documentation
- [ ] Feature
- [ ] Test and CI
- [ ] Code Refactoring

## Which issue(s) this PR fixes:

#20766

## What this PR does / why we need it:
1.  当Group算子只需要计算分组聚合的中间结果时,每当group count达到8192 * 4行发送一次。
2.  补充Group算子的result preextend能力。
3.  调整 Group算子的 AppendBatch策略。

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: fengttt <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 696ef35 commit e271f9b
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 122 deletions.
24 changes: 10 additions & 14 deletions pkg/sql/colexec/aggexec/fromBytesRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,11 @@ func (exec *aggregatorFromBytesToBytes) BulkFill(
}

exec.arg.prepare(vectors[0])
bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
v, null := exec.arg.w.GetStrValue(i)
if !null {
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -240,7 +239,7 @@ func (exec *aggregatorFromBytesToBytes) BulkFill(

for i, j := uint64(0), uint64(length); i < j; i++ {
v, _ := exec.arg.w.GetStrValue(i)
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down Expand Up @@ -271,13 +270,12 @@ func (exec *aggregatorFromBytesToBytes) distinctBulkFill(
return err
}

bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
if needs[i] {
v, null := exec.arg.w.GetStrValue(i)
if !null {
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -290,7 +288,7 @@ func (exec *aggregatorFromBytesToBytes) distinctBulkFill(
for i, j := uint64(0), uint64(length); i < j; i++ {
if needs[i] {
v, _ := exec.arg.w.GetStrValue(i)
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -312,7 +310,6 @@ func (exec *aggregatorFromBytesToBytes) BatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

if vectors[0].IsConst() {
value := vectors[0].GetBytesAt(0)
Expand All @@ -321,7 +318,7 @@ func (exec *aggregatorFromBytesToBytes) BatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err := exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -339,7 +336,7 @@ func (exec *aggregatorFromBytesToBytes) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -356,7 +353,7 @@ func (exec *aggregatorFromBytesToBytes) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -371,7 +368,6 @@ func (exec *aggregatorFromBytesToBytes) distinctBatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

needs, err := exec.distinctHash.batchFill(vectors, offset, groups)
if err != nil {
Expand All @@ -385,7 +381,7 @@ func (exec *aggregatorFromBytesToBytes) distinctBatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err = exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -403,7 +399,7 @@ func (exec *aggregatorFromBytesToBytes) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -420,7 +416,7 @@ func (exec *aggregatorFromBytesToBytes) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down
24 changes: 10 additions & 14 deletions pkg/sql/colexec/aggexec/fromBytesRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,11 @@ func (exec *aggregatorFromBytesToFixed[to]) BulkFill(
}

exec.arg.prepare(vectors[0])
bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
v, null := exec.arg.w.GetStrValue(i)
if !null {
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -338,7 +337,7 @@ func (exec *aggregatorFromBytesToFixed[to]) BulkFill(

for i, j := uint64(0), uint64(length); i < j; i++ {
v, _ := exec.arg.w.GetStrValue(i)
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down Expand Up @@ -369,13 +368,12 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBulkFill(
return err
}

bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
if needs[i] {
v, null := exec.arg.w.GetStrValue(i)
if !null {
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -388,7 +386,7 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBulkFill(
for i, j := uint64(0), uint64(length); i < j; i++ {
if needs[i] {
v, _ := exec.arg.w.GetStrValue(i)
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -410,7 +408,6 @@ func (exec *aggregatorFromBytesToFixed[to]) BatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

if vectors[0].IsConst() {
value := vectors[0].GetBytesAt(0)
Expand All @@ -419,7 +416,7 @@ func (exec *aggregatorFromBytesToFixed[to]) BatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err := exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -437,7 +434,7 @@ func (exec *aggregatorFromBytesToFixed[to]) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -454,7 +451,7 @@ func (exec *aggregatorFromBytesToFixed[to]) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -469,7 +466,6 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

needs, err := exec.distinctHash.batchFill(vectors, offset, groups)
if err != nil {
Expand All @@ -483,7 +479,7 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err = exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -501,7 +497,7 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -518,7 +514,7 @@ func (exec *aggregatorFromBytesToFixed[to]) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down
24 changes: 10 additions & 14 deletions pkg/sql/colexec/aggexec/fromFixedRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,11 @@ func (exec *aggregatorFromFixedToBytes[from]) BulkFill(
}

exec.arg.prepare(vectors[0])
bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
v, null := exec.arg.w.GetValue(i)
if !null {
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -357,7 +356,7 @@ func (exec *aggregatorFromFixedToBytes[from]) BulkFill(

vs := exec.arg.w.UnSafeGetAllValue()
for _, v := range vs {
if err := exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err := exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down Expand Up @@ -388,13 +387,12 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBulkFill(
return err
}

bs := exec.ret.getEmptyListOnX(x)
if exec.arg.w.WithAnyNullValue() {
for i, j := uint64(0), uint64(length); i < j; i++ {
if needs[i] {
v, null := exec.arg.w.GetValue(i)
if !null {
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -407,7 +405,7 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBulkFill(
vs := exec.arg.w.UnSafeGetAllValue()
for i, v := range vs {
if needs[i] {
if err = exec.fill(groupContext, commonContext, v, bs[y], getter, setter); err != nil {
if err = exec.fill(groupContext, commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -429,7 +427,6 @@ func (exec *aggregatorFromFixedToBytes[from]) BatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

if vectors[0].IsConst() {
value := vector.MustFixedColWithTypeCheck[from](vectors[0])[0]
Expand All @@ -438,7 +435,7 @@ func (exec *aggregatorFromFixedToBytes[from]) BatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err := exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -456,7 +453,7 @@ func (exec *aggregatorFromFixedToBytes[from]) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -473,7 +470,7 @@ func (exec *aggregatorFromFixedToBytes[from]) BatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err := exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, vs[i], bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, vs[i], exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -488,7 +485,6 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBatchFill(
getter := exec.ret.get
setter := exec.ret.set
commonContext := exec.execContext.getCommonContext()
bs := exec.ret.getEmptyList()

needs, err := exec.distinctHash.batchFill(vectors, offset, groups)
if err != nil {
Expand All @@ -502,7 +498,7 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBatchFill(
idx := int(group - 1)
x, y := exec.ret.updateNextAccessIdx(idx)
if err = exec.fill(
exec.execContext.getGroupContext(idx), commonContext, value, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(idx), commonContext, value, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -520,7 +516,7 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, v, bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, v, exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand All @@ -537,7 +533,7 @@ func (exec *aggregatorFromFixedToBytes[from]) distinctBatchFill(
groupIdx := int(groups[idx] - 1)
x, y := exec.ret.updateNextAccessIdx(groupIdx)
if err = exec.fill(
exec.execContext.getGroupContext(groupIdx), commonContext, vs[i], bs[x][y], getter, setter); err != nil {
exec.execContext.getGroupContext(groupIdx), commonContext, vs[i], exec.ret.bsFromEmptyList[x][y], getter, setter); err != nil {
return err
}
exec.ret.setGroupNotEmpty(x, y)
Expand Down
Loading

0 comments on commit e271f9b

Please sign in to comment.