diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 7acf63e5c..0a6e01e25 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -26,28 +26,17 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error return rows, err } -// GetAllColumns - returns all columns for a given table including non-schema fields -func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string { - columns, err := table.extractColumns(query, true) - if err != nil { - logger.Error().Msgf("Failed to extract columns from query: %v", err) - return nil - } - return columns -} - -// ProcessQuery - only WHERE clause +// ProcessSimpleSelectQuery - only WHERE clause // TODO query param should be type safe Query representing all parts of // sql statement that were already parsed and not string from which // we have to extract again different parts like where clause and columns to build a proper result -func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) { +func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { colNames, err := table.extractColumns(query, false) rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) if err != nil { return nil, err } - - rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), columns, rowToScan) + rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan) if err == nil { for _, row := range rows { row.Index = table.Name @@ -56,6 +45,16 @@ func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *mod return rows, err } +// TODO add support for autocomplete for attributes, if we'll find it needed +func (lm *LogManager) ProcessFacetsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { + colNames, err := table.extractColumns(query, false) + if err != nil { + return nil, err + } + rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) + return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), []string{"key", "doc_count"}, rowToScan) +} + var random = rand.New(rand.NewSource(time.Now().UnixNano())) const slowQueryThreshold = 30 * time.Second @@ -115,6 +114,24 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs return res, err } +func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { + colNames, err := table.extractColumns(query, false) + if err != nil { + return nil, err + } + rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) + return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan) +} + +func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { + colNames, err := table.extractColumns(query, true) + if err != nil { + return nil, err + } + rowToScan := make([]interface{}, len(colNames)) + return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan) +} + // 'selectFields' are all values that we return from the query, both columns and non-schema fields, // like e.g. count(), or toInt8(boolField) func read(tableName string, rows *sql.Rows, selectFields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) { diff --git a/quesma/model/query.go b/quesma/model/query.go index 0021114f6..9797da988 100644 --- a/quesma/model/query.go +++ b/quesma/model/query.go @@ -91,14 +91,11 @@ func (q *Query) String() string { func (q *Query) StringFromColumns(colNames []string) string { var sb strings.Builder sb.WriteString("SELECT ") - if q.IsDistinct { - sb.WriteString("DISTINCT ") - } for i, field := range colNames { - if field == "*" || field == EmptyFieldSelection { - sb.WriteString(field) - } else { + if field != EmptyFieldSelection { sb.WriteString(strconv.Quote(field)) + } else { + sb.WriteString(field) } if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 { sb.WriteString(", ") @@ -115,25 +112,6 @@ func (q *Query) StringFromColumns(colNames []string) string { where = "" } sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " ")) - if len(q.GroupByFields) > 0 { - sb.WriteString(" GROUP BY (") - for i, field := range q.GroupByFields { - sb.WriteString(field) - if i < len(q.GroupByFields)-1 { - sb.WriteString(", ") - } - } - sb.WriteString(")") - - sb.WriteString(" ORDER BY (") - for i, field := range q.GroupByFields { - sb.WriteString(field) - if i < len(q.GroupByFields)-1 { - sb.WriteString(", ") - } - } - sb.WriteString(")") - } return sb.String() } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index cc0dfd060..7dd3767c5 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin fieldName = "*" } listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size) - hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery, q.logManager.GetAllColumns(table, listQuery)) + hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery, q.logManager.GetAllColumns(table, listQuery)) + countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) @@ -461,26 +461,26 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ switch queryInfo.Typ { case model.CountAsync: fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) + hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) case model.Facets, model.FacetsNumeric: // queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into) fullQuery = queryTranslator.BuildFacetsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, []string{"key", "doc_count"}) + hits, err = q.logManager.ProcessFacetsQuery(dbQueryCtx, table, fullQuery) case model.ListByField: // queryInfo = (ListByField, fieldName, 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) + hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) case model.ListAllFields: // queryInfo = (ListAllFields, "*", 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) + hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) case model.Normal: fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) + hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) default: logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body) @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega for _, agg := range aggregations { logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully sqls += agg.Query.String() + "\n" - rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query, q.logManager.GetAllColumns(table, &agg.Query)) + rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query) if err != nil { logger.ErrorWithCtx(ctx).Msg(err.Error()) continue diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index 5dd115baa..32753d279 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser dbQueryCtx, cancel := context.WithCancel(ctx) // TODO this will be used to cancel goroutine that is executing the query _ = cancel - if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery, qt.ClickhouseLM.GetAllColumns(qt.Table, selectQuery)); err2 != nil { + if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil { logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2) result, err = json.Marshal(emptyTermsEnumResponse()) } else {