Skip to content

Commit

Permalink
Get rid of ProcessFacetsQuery, use one common ProcessQuery (#75)
Browse files Browse the repository at this point in the history
Final `ProcessQuery` unification. One common method
  • Loading branch information
pdelewski authored May 10, 2024
1 parent fa23264 commit 31cd5f1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 25 deletions.
28 changes: 12 additions & 16 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,28 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error
return rows, err
}

// GetAllColumns - returns all columns for a given table including non-schema fields
func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string {
columns, err := table.extractColumns(query, true)
if err != nil {
logger.Error().Msgf("Failed to extract columns from query: %v", err)
return nil
}
return columns
}

// ProcessQuery - only WHERE clause
// TODO query param should be type safe Query representing all parts of
// sql statement that were already parsed and not string from which
// we have to extract again different parts like where clause and columns to build a proper result
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
if err != nil {
return nil, err
}

resultColumns, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), resultColumns, rowToScan)
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), columns, rowToScan)
if err == nil {
for _, row := range rows {
row.Index = table.Name
Expand All @@ -50,16 +56,6 @@ func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *mod
return rows, err
}

// TODO add support for autocomplete for attributes, if we'll find it needed
func (lm *LogManager) ProcessFacetsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), []string{"key", "doc_count"}, rowToScan)
}

var random = rand.New(rand.NewSource(time.Now().UnixNano()))

const slowQueryThreshold = 30 * time.Second
Expand Down
16 changes: 8 additions & 8 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
fieldName = "*"
}
listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size)
hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery)
hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery, q.logManager.GetAllColumns(table, listQuery))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery)
countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery, q.logManager.GetAllColumns(table, listQuery))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
Expand Down Expand Up @@ -461,26 +461,26 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ
switch queryInfo.Typ {
case model.CountAsync:
fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.Facets, model.FacetsNumeric:
// queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into)
fullQuery = queryTranslator.BuildFacetsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessFacetsQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, []string{"key", "doc_count"})

case model.ListByField:
// queryInfo = (ListByField, fieldName, 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.ListAllFields:
// queryInfo = (ListAllFields, "*", 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.Normal:
fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

default:
logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body)
Expand Down Expand Up @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega
for _, agg := range aggregations {
logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully
sqls += agg.Query.String() + "\n"
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query)
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query, q.logManager.GetAllColumns(table, &agg.Query))
if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/termsenum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser
dbQueryCtx, cancel := context.WithCancel(ctx)
// TODO this will be used to cancel goroutine that is executing the query
_ = cancel
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery, qt.ClickhouseLM.GetAllColumns(qt.Table, selectQuery)); err2 != nil {
logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2)
result, err = json.Marshal(emptyTermsEnumResponse())
} else {
Expand Down

0 comments on commit 31cd5f1

Please sign in to comment.